zoukankan      html  css  js  c++  java
  • Dubbo学习摘录(二)

    扩展点机制

    扩展点的配置

    (1)根据关键字读取配置,获取具体的实现类
    比如在 dubbo-demo-provider.xml 文件中配置:

    <dubbo:service protocol="rmi" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />

    则会根据rmi去读取具体的协议实现类RmiProtocol.java 。
    (2)@SPI和@Adaptive

    • @SPI注解,可以认为是定义默认的实现类;
    • @Adaptive注解:该注解打在接口的方法上;调用ExtensionLoader.getAdaptiveExtension()获取适配类,会先通过前面的过程生成 java 的源代码,在通过编译器编译成 class 加载。

    扩展点加载流程

    以如下例子为例:

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    

    在这个语句中,我来分析,首先 Protocol 类带有 SPI 注解,因此我们可以确认默认是使用 ***dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol ***作为默认扩展点。

    @SPI("dubbo")
    public interface Protocol {
        /**
         * 获取缺省端口,当用户没有配置端口时使用。 *
         *
         * @return 缺省端口
         */
        int getDefaultPort();
    
        /**
         * 暴露远程服务:<br>
         * 1. 协议在接收请求时,应记录请求来源方地址信息: RpcContext.getContext().setRemoteAddress();<br>
         * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没 有区别。<br>
         * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br> *
         *
         * @param <T>     服务的类型
         * @param invoker 服务的执行体
         * @return exporter 暴露服务的引用,用于取消暴露
         * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
         */
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    /**
     * 引用远程服务:<br>
     * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行
     同URL远端export()传入的Invoker对象的invoke()方法。<br>
     * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请
     求。<br>
     * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br> *
     *
     *
     *
     *
     *
     */
        @param <T>服务的类型
        @param
        type 服务的类型
        @param
        url 远程服务的URL地址
    @return
        invoker 服务的本地代理
    @throws
        RpcException 当连接服务
        供方失败时抛出
        @Adaptive
    
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        /**
         * 释放协议:<br>
         * 1. 取消该协议所有已经暴露和引用的服务。<br>
         * 2. 释放协议所占用的所有资源,比如连接和端口。<br> * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
         */
        void destroy();
    }
    

    这里对应的函数为:

    @SuppressWarnings("unchecked")
        public T getAdaptiveExtension() {
            Object instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                if (createAdaptiveInstanceError == null) {
                    synchronized (cachedAdaptiveInstance) {
                        instance = cachedAdaptiveInstance.get();
                        if (instance == null) {
                            try {
                                instance = createAdaptiveExtension();
                                cachedAdaptiveInstance.set(instance);
                            } catch (Throwable t) {
                                createAdaptiveInstanceError = t;
                                throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                            }
                        }
                    }
                } else {
                    throw new IllegalStateException("fail to create adaptive
                            instance:" + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
                }
            }
            return (T) instance;
        }
    

    这里看到 createAdaptiveExtension 函数:

    @SuppressWarnings("unchecked")
        private T createAdaptiveExtension() {
            try {
                return injectExtension((T)
                        getAdaptiveExtensionClass().newInstance());
            } catch (Exception e) {
                throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
            }
        }
    
        private Class<?> getAdaptiveExtensionClass() {
            getExtensionClasses();
            if (cachedAdaptiveClass != null) {
                return cachedAdaptiveClass;
            }
            return cachedAdaptiveClass = createAdaptiveExtensionClass();
        }
    

    动态生成类
    而 cachedAdaptiveInstance 类则是若有 cachedAdaptiveClass 对象,则直接返回,否则通 过生成类文件,然后 complier 出来的,其文件内容如下:从该文件可以很容易看出只有标 注了@Adaptive 注释的函数会在运行时动态的决定扩展点实现;

    package com.alibaba.dubbo.rpc;
            import com.alibaba.dubbo.common.extension.ExtensionLoader;
    
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy() {
            throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort() {
            throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null) throw new
                    IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(co m.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
    
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
            if (arg1 == null) throw new IllegalArgumentException("url ==
            null ");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = (url.getProtocol() == null ? "dubbo" :
                    url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(co m.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    }
    

    此时我们分析 getExtension 函数:

    /**
         * 返回指定名字的扩展。如果指定名字的扩展不存在,则抛异常 {@link IllegalStateException}.
         *
         * @param name * @return
         */
        @SuppressWarnings("unchecked")
        public T getExtension(String name) {
            if (name == null || name.length() == 0)
                throw new IllegalArgumentException("Extension name == null");
            if ("true".equals(name)) {
                return getDefaultExtension();
            }
            Holder<Object> holder = cachedInstances.get(name);
            if (holder == null) {
                cachedInstances.putIfAbsent(name, new Holder<Object>());
                holder = cachedInstances.get(name);
            }
            Object instance = holder.get();
            if (instance == null) {
                synchronized (holder) {
                    instance = holder.get();
                    if (instance == null) {
                        instance = createExtension(name);
                        holder.set(instance);
                    }
                }
            }
            return (T) instance;
        }
    

    此时我们分析 createExtension,

    @SuppressWarnings("unchecked")
        private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (wrapperClasses != null && wrapperClasses.size() > 0) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T)
                                wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
            } catch (Throwable t) {
                throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                        type + ") couldnotbeinstantiated:" + t.getMessage(),
                        t);
            }
        }
    

    而这里 injectExtension 类,则是为生成的 instance 注入变量; 其目标是搜索所有 set 开头,同时只有一个入参的函数,执行该函数,对变量进行注入;

    private T injectExtension(T instance) {
            try {
                if (objectFactory != null) {
                    for (Method method : instance.getClass().getMethods()) {
                        if (method.getName().startsWith("set")
                                && method.getParameterTypes().length == 1
                                && Modifier.isPublic(method.getModifiers())) {
                            Class<?> pt = method.getParameterTypes()[0];
                            try {
                                String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() +
                                        method.getName().substring(4) : "";
                                Object object = objectFactory.getExtension(pt,
                                        property);
                                if (object != null) {
                                    method.getName()
                                    " + e.getMessage(), e);
                                }
                            }
                        }
                    }
                } catch(Exception e){
                    method.invoke(instance, object);
                }
            } catch (Exception e) {
                logger.error("fail to inject via method " +
                                +" of interface " + type.getName() + ":
                        logger.error(e.getMessage(), e);
            }
            return instance;
        }
    

    此时我们的目光转到如下一段代码:

    Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if(wrapperClasses !=null&&wrapperClasses.size()>0)
    
        {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T)
                        wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
    
    

    在分析这段代码的作用之前,我们先来分析一下Set<Class<?>> cachedWrapperClasses是如何被赋值的;
    此时我们转到,private void loadFile(Map<String, Class<?>> extensionClasses, String dir) 函数:

    分析:这里实际上是如果该类带有 Adaptive 注解,则认为是 cachedAdaptiveClass;若该 类没有 Adaptive 注解,则判断该类是否带有参数是 type 类型的构造函数,若有,则认为是wrapper 类;
    于是我们分析如下文件 dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 其内容为:

    filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
    listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
    mock=com.alibaba.dubbo.rpc.support.MockProtocol

    我们分析这三个类,会发现 mock 类没有参数为 Protocol 的自定义参数,而其他两个均有; 此时我们返回到 createExtension 函数:

        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) {
            instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
        } }
    

    此时可以发现这里对 instance 加了装饰类;对于 Protocol 来说加了两个装饰类 ProtocolFilterWrapper 和 ProtocolListenerWrapper;
    也就/injectExtension 实例化包装类,并注入接口的适配器, 注意这个地方返回的是最后一 个包装类。

    扩展点装饰

    Filter
  • 相关阅读:
    [VBS]遍历XML文档
    [VBS]带参数删除扩展名不是*.h、*.c、*.cpp的全部文件
    [VBS]脚本中的字典、动态数组、队列和堆栈
    [VBS]检测计算机各硬件信息
    [cmd]如何设置 Windows 默认命令行窗口大小和缓冲区大小
    VB.NET and C# 差异
    host-only局域网络
    高并发、死锁、幂等性问题
    elasticsearch简单实现
    记一次504 Gateway Time-out
  • 原文地址:https://www.cnblogs.com/jenkov/p/dubby_learning_002.html
Copyright © 2011-2022 走看看