zoukankan      html  css  js  c++  java
  • Dubbo实践(七)扩展点

    与JDK的SPI机制类似,Dubbo也在META-INF路径下定义了多种扩展接口。只是JDK SPI机制是Java后台帮你实现读取文件并对接具体的实现类,而Dubbo是自己去读文件。

    扩展点配置

    扩展点机制有几个要点:

    1. 根据关键字去读取配置文件,获得具体的实现类

    比如在dubbo-provider.xml文件中配置:

    <dubbo:protocol name="dubbo" port="20880" />

    则会根据dubbo去读取具体的协议实现类DubboProtocol.java

    2. 注解@SPI和@Adaptive

    • @SPI注解:可以认为是定义默认实现类;比如Protocol接口中,定义默认协议时dubbo;
    • @Adaptive注解:该注解打在接口方法上;调ExtensionLoader.getAdaptiveExtension()获取适配类,会先通过前面的过程生成java的源代码,再通过编译器编译成class加载。但是Compiler的实现策略选择也是通过ExtensionLoader.getAdaptiveExtension(),如果也通过编译器编译成class文件那岂不是要死循环下去了吗?

    此时分析ExtensionLoader.getAdaptiveExtension()函数,它获取适配类不再通过前面过程生成适配类java源代码,而是在读取扩展文件的时候遇到实现类打了注解@Adaptive就把这个类作为适配类缓存在ExtensionLoader中,调用时直接返回。

    3. filter和listener

    在生成具体的实现类对象时,不是直接读取类文件,而是在读取类文件的基础上,通过filter和listener去封装类对象。

    扩展点加载流程

    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    在这个例子中,首先Protocol类带有SPI注解,因此我们可以确认默认是使用dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol作为默认扩展点。

    /**
     * Protocol. (API/SPI, Singleton, ThreadSafe)
     */
    @SPI("dubbo")
    public interface Protocol {
    
        /**
         * Get default port when user doesn't config the port.
         *
         * @return default port
         */
        int getDefaultPort();
    
        /**
         * 暴露远程服务:<br>
         * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
         * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
         * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
         *
         * @param <T>     服务的类型
         * @param invoker 服务的执行体
         * @return exporter 暴露服务的引用,用于取消暴露
         * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
         */
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        /**
         * 引用远程服务:<br>
         * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
         * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
         * 3. 当url中有设置check=false时,连接失败不能抛出异常,需内部自动恢复。<br>
         *
         * @param <T>  服务的类型
         * @param type 服务的类型
         * @param url  远程服务的URL地址
         * @return invoker 服务的本地代理
         * @throws RpcException 当连接服务提供方失败时抛出
         */
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        /**
         * 释放协议:<br>
         * 1. 取消该协议所有已经暴露和引用的服务。<br>
         * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
         * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
         */
        void destroy();
    
    }

    对应的函数为:

    @SuppressWarnings("unchecked")
        public T getAdaptiveExtension() {
            Object instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                if (createAdaptiveInstanceError == null) {
                    synchronized (cachedAdaptiveInstance) {
                        instance = cachedAdaptiveInstance.get();
                        if (instance == null) {
                            try {
                                instance = createAdaptiveExtension();
                                cachedAdaptiveInstance.set(instance);
                            } catch (Throwable t) {
                                createAdaptiveInstanceError = t;
                                throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                            }
                        }
                    }
                } else {
                    throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
                }
            }
    
            return (T) instance;
        }

    这里看到createAdaptiveExtension函数:

    @SuppressWarnings("unchecked")
        private T createAdaptiveExtension() {
            try {
                return injectExtension((T) getAdaptiveExtensionClass().newInstance());
            } catch (Exception e) {
                throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
            }
        }
    private Class<?> getAdaptiveExtensionClass() {
            getExtensionClasses();
            if (cachedAdaptiveClass != null) {
                return cachedAdaptiveClass;
            }
            return cachedAdaptiveClass = createAdaptiveExtensionClass();
        }

    若有cachedAdaptiveClass对象,则直接返回,否则通过生成类文件,然后complier出来。

    此时我们分析getExtension函数:

    /**
         * 返回指定名字的扩展。如果指定名字的扩展不存在,则抛异常 {@link IllegalStateException}
         * will be thrown.
         */
        @SuppressWarnings("unchecked")
        public T getExtension(String name) {
            if (name == null || name.length() == 0)
                throw new IllegalArgumentException("Extension name == null");
            if ("true".equals(name)) {
                return getDefaultExtension();
            }
            Holder<Object> holder = cachedInstances.get(name);
            if (holder == null) {
                cachedInstances.putIfAbsent(name, new Holder<Object>());
                holder = cachedInstances.get(name);
            }
            Object instance = holder.get();
            if (instance == null) {
                synchronized (holder) {
                    instance = holder.get();
                    if (instance == null) {
                        instance = createExtension(name);
                        holder.set(instance);
                    }
                }
            }
            return (T) instance;
        }

    此时我们分析createExtension:

    @SuppressWarnings("unchecked")
        private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
            } catch (Throwable t) {
                throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                        type + ")  could not be instantiated: " + t.getMessage(), t);
            }
        }

    而这里injectExtension类,则是为生成的instance注入变量; 其目标是搜索所有set开头,同时只有一个入参的函数,执行该函数,对变量进行注入:

    private T injectExtension(T instance) {
            try {
                if (objectFactory != null) {
                    for (Method method : instance.getClass().getMethods()) {
                        if (method.getName().startsWith("set")
                                && method.getParameterTypes().length == 1
                                && Modifier.isPublic(method.getModifiers())) {
                            Class<?> pt = method.getParameterTypes()[0];
                            try {
                                String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                                Object object = objectFactory.getExtension(pt, property);
                                if (object != null) {
                                    method.invoke(instance, object);
                                }
                            } catch (Exception e) {
                                logger.error("fail to inject via method " + method.getName()
                                        + " of interface " + type.getName() + ": " + e.getMessage(), e);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            return instance;
        }

    此时我们的目光转到如下一段代码:

    Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }

    在分析这段代码的作用之前,我们先来分析一下 Set<Class<?>> cachedWrapperClasses是如何被赋值的,此时我们转到 private void loadFile(Map<String, Class<?>> extensionClasses, String dir) 函数:

    if (clazz.isAnnotationPresent(Adaptive.class)) {
                if (cachedAdaptiveClass == null) {
                    cachedAdaptiveClass = clazz;
                } else if (!cachedAdaptiveClass.equals(clazz)) {
                    throw new IllegalStateException("More than 1 adaptive class found: "
                            + cachedAdaptiveClass.getClass().getName()
                            + ", " + clazz.getClass().getName());
                }
            } else {
                try {
                    clazz.getConstructor(type);
                    Set<Class<?>> wrappers = cachedWrapperClasses;
                    if (wrappers == null) {
                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                        wrappers = cachedWrapperClasses;
                    }
                    wrappers.add(clazz);
                } catch (NoSuchMethodException e) {
                    clazz.getConstructor();
                    if (name == null || name.length() == 0) {
                        name = findAnnotationName(clazz);
                        if (name == null || name.length() == 0) {
                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                            } else {
                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                            }
                        }
                    }
                    String[] names = NAME_SEPARATOR.split(name);
                    if (names != null && names.length > 0) {
                        Activate activate = clazz.getAnnotation(Activate.class);
                        if (activate != null) {
                            cachedActivates.put(names[0], activate);
                        }
                        for (String n : names) {
                            if (!cachedNames.containsKey(clazz)) {
                                cachedNames.put(clazz, n);
                            }
                            Class<?> c = extensionClasses.get(n);
                            if (c == null) {
                                extensionClasses.put(n, clazz);
                            } else if (c != clazz) {
                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                            }
                        }
                    }
                }
            }

    这里实际上是如果该类带有Adaptive注解,则认为是cachedAdaptiveClass;若该类没有Adaptive注解,则判断该类是否带有参数是type类型的构造函数,若有,则认为是wrapper类。

    于是我们分析文件META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 其内容为:

    filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
    listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
    mock=com.alibaba.dubbo.rpc.support.MockProtocol

    我们分析这三个类,会发现mock类没有参数为Protocol的自定义参数,而其他两个均有; 此时我们返回到createExtension函数:

    Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }

    此时可以发现这里对instance加了装饰类;对于Protocol来说加了两个装饰类 ProtocolFilterWrapper和ProtocolListenerWrapper; 也就injectExtension 实例化包装类,并注入接口的适配器, 注意这个地方返回的是最后一个包装类。

  • 相关阅读:
    【MySQL 5.7 Reference Manual】15.4.2 Change Buffer(变更缓冲)
    从MySQL slave system lock延迟说开去
    一文搞懂Raft算法
    Relinking Oracle Home FAQ ( Frequently Asked Questions) (Doc ID 1467060.1)
    Oracle 单实例 Relink Binary Options 说明
    oracle upgrade best pratics
    sysbench安装、使用、结果解读
    Linux 进程管理之四大名捕
    iOS 严重问题解释(crash)
    iOS 图片剪切和压缩的几个方法
  • 原文地址:https://www.cnblogs.com/warehouse/p/9336293.html
Copyright © 2011-2022 走看看