zoukankan      html  css  js  c++  java
  • Dubbo发布过程中,扩展点的加载

    扩展点的功能

    为什么Dubbo要引入扩展点加载功能呢?根本性上是从Dubbo的驱动方式上考虑出发的。Dubbo是基于URL驱动的。譬如我们发布一个Dubbo服务,提供了一个服务者,在一个提供服务接口的同时,我们会提供诸如使用什么协议(Dubbo)、什么注册中心RegistryProtocol(zookeeper)、集群方式cluster(failover)、代理方式ProxyFactory(javaassist),我们在配置服务的属性时,经过Dubbo的ServiceBean和ReferenceBean的处理,会将这些属性值拼装成一个URL对象,在调用关键节点时,会根据URL的配置属性动态的去加载扩展点下面配置的实现类,在动态代理类里面实现对具体实现类的调用。
    下面粘贴几个实例代码

    • Protocol$Adaptive
    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
        public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
        public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    }
    
    
    • Cluster$Adaptive
    package com.alibaba.dubbo.rpc.cluster;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
        public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
            if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = url.getParameter("cluster", "failover");
            if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
            com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
            return extension.join(arg0);
        }
    }
    

    代码中的扩展点

    在Dubbo服务发布的过程中,第一次出现扩展点的加载是在doExportUrlsFor1Protocol()方法中,加载ConfiguratorFactory配置工厂。

            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
    

    首先根据加载类类型获取扩展类加载器,然后根据扩展点名称获取具体的扩展点。先从缓存中获取对应类型的扩展器,如果缓存中不存在,则利用构造函数创建一个新的加载器,并放入到缓存中。

    public class ExtensionLoader<T> {
    
        private ExtensionLoader(Class<?> type) {
            this.type = type;
            objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
        }
        public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
            if (type == null)
                throw new IllegalArgumentException("Extension type == null");
            if (!type.isInterface()) {
                throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
            }
            if (!withExtensionAnnotation(type)) {
                throw new IllegalArgumentException("Extension type(" + type +
                        ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
            }
    
            ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
            if (loader == null) {
                EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
                loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
            }
            return loader;
        }
    
    
        public boolean hasExtension(String name) {
            if (name == null || name.length() == 0)
                throw new IllegalArgumentException("Extension name == null");
            try {
                this.getExtensionClass(name);
                return true;
            } catch (Throwable t) {
                return false;
            }
        }
    
        private ExtensionLoader(Class<?> type) {
            this.type = type;
            objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
        }
    
        public T getAdaptiveExtension() {
            Object instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                if (createAdaptiveInstanceError == null) {
                    synchronized (cachedAdaptiveInstance) {
                        instance = cachedAdaptiveInstance.get();
                        if (instance == null) {
                            try {
                                instance = createAdaptiveExtension();
                                cachedAdaptiveInstance.set(instance);
                            } catch (Throwable t) {
                                createAdaptiveInstanceError = t;
                                throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                            }
                        }
                    }
                } else {
                    throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
                }
            }
    
            return (T) instance;
        }
    
        private T createAdaptiveExtension() {
            try {
                return injectExtension((T) getAdaptiveExtensionClass().newInstance());
            } catch (Exception e) {
                throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
            }
        }
    }
    

    调用过程为:

    getExtensionClass(String name)
      ->getExtensionClass(String name)
    

    创建DubboProtocol的过程为,其中包含多个Wrapper的包装:

    Dubbo是URL驱动的。

    .getAdaptiveExtension():获取的是自适应类,是根据@Adaptive扩展点,动态生成class code,并经过编译生成的动态代理类。
    另一个常用的方法是:getExtension:用来获取宽扩展类实例。

    Protocol是Protocol$Adaptive,实际调用的是DubboProtocol
    ProxyFactoty是 ProxyFactoty$Adaptive,实际调用的是JavassistProxyFactory,用来生成代理类和调用者。
    此处代码意思是:使用DubboProtocol协议在本地暴露一个使用JavassistProxyFactory生成的代理类的调用者。

        private void exportLocal(URL url) {
            if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                URL local = URL.valueOf(url.toFullString())
                        .setProtocol(Constants.LOCAL_PROTOCOL)
                        .setHost(LOCALHOST)
                        .setPort(0);
                ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
                Exporter<?> exporter = protocol.export(
                        proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
                exporters.add(exporter);
                logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
            }
        }
    

    injectExtension()方法,用来处理方法名带有set()的方法,动态的设置处理参数。例如:此处的协议为动态,根据URL中配置的参数而定,所以添加了一个注入的方法。injectExtension()方法中,判断到有set()开始的方法,通过Object object = objectFactory.getExtension(pt, property)得到一个动态代理类Type$Adaptive,然后将该动态代理对象设置到参数中

    public class StubProxyFactoryWrapper implements ProxyFactory {
        public void setProtocol(Protocol protocol) {
            this.protocol = protocol;
        }
    }
    public class ExtensionLoader<T> {
        private T injectExtension(T instance) {
            try {
                if (objectFactory != null) {
                    for (Method method : instance.getClass().getMethods()) {
                        if (method.getName().startsWith("set")
                                && method.getParameterTypes().length == 1
                                && Modifier.isPublic(method.getModifiers())) {
                            Class<?> pt = method.getParameterTypes()[0];
                            try {
                                String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                                Object object = objectFactory.getExtension(pt, property);
                                if (object != null) {
                                    method.invoke(instance, object);
                                }
                            } catch (Exception e) {
                                logger.error("fail to inject via method " + method.getName()
                                        + " of interface " + type.getName() + ": " + e.getMessage(), e);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            return instance;
        }
    }
    

    另一个常用的方法是:getActivateExtension(URL url, String[] values, String group):获取激活点

    @SPI
    public interface ExtensionFactory {
    
    }
    

    @SPI注解,用来提供获取ExtensionFactory的实现类:public class SpiExtensionFactory
    其中的isWrapperClass判断规则是:实现类的构造函数是否是包含加载类参数。如果是,则判断为包装类,并将包装类添加到cachedWrapperClasses成员变量中,加载类中不添加。
    @Activate注解:当某个类的上标注@Activate注解时,在loadClass的过程中,由于判断类上有@Activate注解,会将加载的类添加到cachedActivates成员变量中,作为在加载扩展点类时的返回对象。

    注意 JavassistProxyFactory类中getInvoker()方法,final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);

    @Activate注解在Filter中有使用案例:

    @Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
    public class CacheFilter implements Filter {
    }
    

    在调用export()方法的过程当中,首先对Filter进行了加载。
    阶段性总结export()过程:
    在export()过程中,首先调用了包装DubboProtocol的Wrapper类,包括三个类,由外到内依次是:QosProtocolWrapper、ProtocolListenerWrapper,ProtocolFilter,由外层逐级调用内层,再调用到ProtocolFilterWrapper类的时候,调用了一个构建调用链的方法, protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)),其中Constants.SERVICE_FILTER_KEY=service.filter;Constants.PROVIDER = provider,其中,在URL的参数中,service.filter的value为null,所以在加载了filter的所有实现类之后,根据Activate的条件对Filter进行过滤;其中此处的过滤条件为 :group为group;在此过程中符合条件的Filtle有8个:

    0 = {ExceptionFilter@11068} 
    1 = {ClassLoaderFilter@11268} 
    2 = {EchoFilter@11442} 
    3 = {MonitorFilter@11832} 
    4 = {GenericFilter@11974} 
    5 = {TimeoutFilter@11997} 
    6 = {TraceFilter@12018} 
    7 = {ContextFilter@12032} 
    

    然后对Filter进行循环处理,循环创建invoker,并调用上一个invoker的invoker方法。
    然后调用InjvmProtocol的export()方法进行服务暴露,构造了一个InjvmExporter对象

        InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
            super(invoker);
            this.key = key;//key=com.bail.user.service.IUserService:1.0.0
            this.exporterMap = exporterMap;
            exporterMap.put(key, this);//将InjvmExporter自己放入到了map容器中
        }
    
    

    在exportLocal方法返回的Exporter为一个ListenerExporterWrapper类型的,并将该对象放入到exporters容器中,exporters为ServiceBean的一个成员变量。
    ServiceConfig处的export,protocol值为registry.

    接下来看DubboProtocol的发布过程:
    构建DubboExporter
    openServer

  • 相关阅读:
    复习一下 .Net: delegate(委托)、event(事件) 的基础知识,从头到尾实现事件!
    雕虫小技: 给枯燥的 .Net 控制台程序(字符界面)来点儿心跳 (关于退格 '\b' 的使用)
    .Net Remoting 事件回调 Client 函数方法完整实例: C# 实现控制台网络聊天室 (Console Remoting ChatRoom)
    一气呵成得到 MSSQL DB 中所有表的字段默认值约束的 DDL SQL 脚本
    根据数据生成 INSERT INTO ... 的 SQL (.Net C#, TSQL Store Procedure 分别实现)
    .Net/C# 与 J2EE/Java Web Service 互操作完整实例
    TSQL: 关于 Varbinary(Hex,Int) 与 Varchar(HexString) 之间的(数据类型)转换
    Linux零碎记录之ulimit【堆栈大小、stack size、进程数限制、文件句柄限制、linux用户空间限制】
    svn之svn:ignore命令行设置
    C语言零碎记录之extern
  • 原文地址:https://www.cnblogs.com/nangonghui/p/15598154.html
Copyright © 2011-2022 走看看