zoukankan      html  css  js  c++  java
  • DUBBO 服务导出实现

    在阅读此文章之前,我希望阅读者对Spring 扩展机制的有一定的了解,比如:自定义标签与Spring整合, InitializingBean 接口,ApplicationContextAware,BeanNameAware,

    BeanFactory 接口所起到的作用 ;从来没了解过的,请看我之前的关于Spring的博客 

    关于此篇文章受益于 :http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html   dubbo官网文档

    开始正题

    (一)onApplicationEvent (事件监听)

    dubbo 服务导出的方法是在 com.alibaba.dubbo.config.spring.ServiceBean 类中,dubbo 服务的导出过程始于在Spring 容器发生刷新事件,那么如何感知到Spring 容器发生刷新

    事件呢? ~~ 得益于Spring提供的 ApplicationListener 接口,看如下代码实现:

    public void onApplicationEvent(ApplicationEvent event) {
        //在Spring bean 刷新后进行服务的导出;
            if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
                if (isDelay() && ! isExported() && ! isUnexported()) {
                    if (logger.isInfoEnabled()) {
                        logger.info("The service ready on spring started. service: " + getInterface());
                    }
                    export();
                }
            }
        }

    实现该接口需要实现 onApplicationEvent 方法;事件的监听,在有event 事件发生后,Spring会自动进行触发此方法;

    在Spring 容器发生刷新事件后进行导出 export();   这一步就是最先开始的地方;

    (二)检查参数,组装 URL

    public synchronized void export() {
            if (provider != null) {
            //是否进行导出的操作,用于在我们配置了<dubbo:provider export="false" />的时候,本地进行操作,不进行服务暴露的时候
                if (export == null) {
                    export = provider.getExport();
                }
                if (delay == null) {
                    delay = provider.getDelay();
                }
            }
            if (export != null && ! export.booleanValue()) {
                return;
            }
            //延时导出,线程睡眠,在高版本中,此方法改变成了schedule
            if (delay != null && delay > 0) {
                Thread thread = new Thread(new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep(delay);
                        } catch (Throwable e) {
                        }
                        doExport();
                    }
                });
                thread.setDaemon(true);
                thread.setName("DelayExportServiceThread");
                thread.start();
            } else {
                doExport();
            }
        }        

    以上的方法就是进行了<dubbo:provider> 标签属性 export 与delay 的操作判断;

    我们继续进行  doExport();

     doExport() 又进行了一些的判断与初始化的动作,没什么好说的,接下来就是调用 doExportUrls();

    private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);
            for (ProtocolConfig protocolConfig : protocols) {
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    loadRegistries() 会将一些必要信息进行排序封装,生成url地址链接如下格式:

    registry://47.95.**.138:2181/com.alibaba.dubbo.registry.RegistryService?application=hm-service-cart&check=false&dubbo=2.5.3&pid=931&registry=zookeeper&timestamp=1545835371053
    有多少协议,就导出多少次:

    doExportUrlsFor1Protocol(protocolConfig, registryURLs);这个方法我们在第4步分析;

    我们先看看Dubbo spi 机制
      

     (三) Dubbo SPI (服务发现机制)

    DUBBO SPI 的实现主要依赖于。 com.alibaba.dubbo.common.extension.ExtensionLoader 这个类,这个类是dubbo 运行的容器,维护着一些组件的实例,就像Spring 容器一样;

    SPI 服务发现机制;我给大家揭开它的面纱:

    在源码阅读中,我们只要发现了像如下的代码,内部就进行了spi 的机制

    ExtensionLoader.getExtensionLoader(XXX.class).getExtension(type);
    ExtensionLoader.getExtensionLoader(XXX.class).getAdaptiveExtension();

    xxx.class 是接口名字,必须带有@SPI的注解

    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        //不为空检测
            if (type == null)
                throw new IllegalArgumentException("Extension type == null");
    //必须的接口
            if(!type.isInterface()) {
                throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
            }
    //接口必须有@SPI 的注解
            if(!withExtensionAnnotation(type)) {
                throw new IllegalArgumentException("Extension type(" + type + 
                        ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
            }
            //实例化
            ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
            if (loader == null) {
                EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
                loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
            }
            return loader;
        }

    具体如下

    1.(大家可以去看一看dubbo jar 包下 META-INF/dubbo/internal 目录的文件):以接口作为文件名称,内部是ke y = 实现类

     private static final String SERVICES_DIRECTORY = "META-INF/services/";
    
     private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
        
     private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

    2.在 getExtension  方法执行中,主要看  createExtension(name)方法;

    public T getExtension(String name) {
            if (name == null || name.length() == 0)
                throw new IllegalArgumentException("Extension name == null");
            if ("true".equals(name)) {
                return getDefaultExtension();
            }
            Holder<Object> holder = cachedInstances.get(name);
            if (holder == null) {
                cachedInstances.putIfAbsent(name, new Holder<Object>());
                holder = cachedInstances.get(name);
            }
            Object instance = holder.get();
            if (instance == null) {
                synchronized (holder) {
                    instance = holder.get();
                    if (instance == null) {
                        instance = createExtension(name);
                        holder.set(instance);
                    }
                }
            }
            return (T) instance;
        }
    createExtension(name)方法(ioc + aop )
    private T createExtension(String name) {
          //这个方法会从文件系统中获取name 所对应的value ,大家可以看看这个方法 Class
    <?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); }
            //ioc 操作 injectExtension(instance); Set
    <Class<?>> wrapperClasses = cachedWrapperClasses;
           //aop 包装的操作,包装类你调用我我调用你,形成了一条执行链;wrapperClasses 是包装类,在loadFile() 方法中被装配
    if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }

    获取所有的扩展类:

        private Map<String, Class<?>> getExtensionClasses() {
            Map<String, Class<?>> classes = cachedClasses.get();
            if (classes == null) {
                synchronized (cachedClasses) {
                    classes = cachedClasses.get();
                    if (classes == null) {
                        classes = loadExtensionClasses();
                        cachedClasses.set(classes);
                    }
                }
            }
            return classes;
        }
     // 此方法已经getExtensionClasses方法同步过。
        private Map<String, Class<?>> loadExtensionClasses() {
            final SPI defaultAnnotation = type.getAnnotation(SPI.class);
            if(defaultAnnotation != null) {
                String value = defaultAnnotation.value();
                if(value != null && (value = value.trim()).length() > 0) {
                    String[] names = NAME_SEPARATOR.split(value);
                    if(names.length > 1) {
                        throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                                + ": " + Arrays.toString(names));
                    }
                    if(names.length == 1) cachedDefaultName = names[0];
                }
            }
            
            Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        //从META-INF/services/ META-INF/dubbo/ /META-INF/dubbo/internal/ 这三个文件夹中获取class配置 loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY);
    return extensionClasses; }
      //dubbo 扩展检索的文件

    private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

    这样对应着key 就可以实例化value,找到实现类了; 大家可以做一做小的demo进行一下测试:

    /**
    接口
     * dubbo spi 机制,需要引用dubbo的spi 注解标签
     */
    @SPI
    public interface DubboSPI {
    
        public void sayHello();
    }
    
    //实现类
    public class DuboSPIImpl implements DubboSPI {
    
        public void sayHello(){
            System.out.println("dubbo spi 机制运行......cys");
        };
    
    }
    
    
    
    //测试类
    public static void main(String[] args) {
            ExtensionLoader<DubboSPI> extensionLoader =
                    ExtensionLoader.getExtensionLoader(DubboSPI.class);
            DubboSPI optimusPrime = extensionLoader.getExtension("DubboSPI");
            optimusPrime.sayHello();
    
        }

    在META-INF/dubbo 下配置创建 com.baseknow.spi.dubbospi.DubboSPI (你的接口名字)文件 

    DubboSPI = com.baseknow.spi.dubbospi.DuboSPIImpl

    关于一些常规的操作就到这里;

    高级特性(动态生成字节码文件,加载进入内存)

    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

     如上:dubbo 通过 getAdaptiveExtension()进行扩展适配;通过javassist 将动态创建的类加载进内存;可以这么说,如上的protocol  proxyFactory 这两个对象是不存在的

    通过dubbo 扩展 SPI Adapter 机制,将会自动生成实现该接口的一个类(这个类是自动生成的,不存在我们的文件中),通过javassiste 进行记载,然后通过创建的Adapter 类进行方法

    的调度;

     具体请看 ExtensionLoader 类如下两个方法(源码生成➕编译):

    private Class<?> createAdaptiveExtensionClass() {
            //生成源码code ,建议debug ,看一看生成的源码是什么样子的
            String code = createAdaptiveExtensionClassCode();
            ClassLoader classLoader = findClassLoader();
            com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        //默认会进行javassist 将源码加载toclass()
            return compiler.compile(code, classLoader);
        }
    
        //源码生成的方法
        private String createAdaptiveExtensionClassCode() {
            StringBuilder codeBuidler = new StringBuilder();
            Method[] methods = type.getMethods();
            boolean hasAdaptiveAnnotation = false;
            for(Method m : methods) {
                if(m.isAnnotationPresent(Adaptive.class)) {
                    hasAdaptiveAnnotation = true;
                    break;
                }
            }        
    //........ 

     如下是dubbo   通过 getAdaptiveExtension() 通过一系列判断 stringbbuilder 拼接而生成的 实现了Protocol接口的 适配器源码,根据传来的url来处理不同协议的调度

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    
    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
    if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
    }
    
    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
    if (arg1 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg1;
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
    }
    }

     这是实现了ProxyFactory  接口的 适配器源码

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); }
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } }

     以上两个类就是生成的源码code;

    通过Javassist 进行将类进行加载成字节码 com.alibaba.dubbo.common.compiler.support.JavassistCompiler    

    总的来说就二步:

    1.通过Dubbo 扩展机制 ExtensionLoader 生成一个接口的自定义实现类源码;
    2.通过javassist 将生成的源码转换为字节码加载进入内存

    关于Protocol 与 ProxyFactory 这两个动态的实现类,内部又是进行了spi机制的一些方法调用;大家看一看,因为下面的一些扩展都是基于这两个的,不然很难知道它进行调用了哪个实现类;当然还有 Transporter 接口实现等等也是该机制;

    如下是Transport Adapter 源码:可以看出在没有指定传输的时候,默认使用的是netty

    package com.alibaba.dubbo.remoting;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
    public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg0;
    String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
    com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
    return extension.connect(arg0, arg1);
    }
    public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg0;
    String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
    com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
    return extension.bind(arg0, arg1);
    }
    }

     关于高级特性就到这里,具体的还是要看getAdaptiveExtension() 方法实现,里面是进行了 StringBuilder  append 拼接源码生成新的类,再进行JAVAssist toClass() 操作,需要对

    JAVAssist 了解,不了解,大约知道生成的新类长什么样子就好了;

      (四) Netty(mina) 服务的启动,端口绑定

    紧接着第二步的步伐,我们分析 doExportUrlsFor1Protocol();我们抛开一些小细节,具体看如下Mark 红色的部分

     //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
                if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (registryURLs != null && registryURLs.size() > 0
                            && url.getParameter("register", true)) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
                            Exporter<?> exporter = protocol.export(invoker);
                            exporters.add(exporter);
    proxyFactory 与 protocol 都是动态生成类,大家可以看第四部分我贴出来的动态生成的源码,就可以知道真正的实现类是哪一个(默认是javassist 的实现类);

    protocol 则是进行了aop 式的包装类,会将DubboProtocol 类 ProtocolFilterWrapper 类, ProtocolListenerWrapper类,RegistryProtocol 类的export()方法执行一边,为什么都会执行呢,请大家看 ExtensionLoader 的 createExtension 方法;如下:
    private T createExtension(String name) {
          //这个方法会从文件系统中获取name 所对应的value ,大家可以看看这个方法
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
            //ioc 操作
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
           //aop 包装的操作,包装类你调用我我调用你,形成了一条执行链;
                if (wrapperClasses != null && wrapperClasses.size() > 0) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
            } catch (Throwable t) {
                throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                        type + ")  could not be instantiated: " + t.getMessage(), t);
            }
        }

    在调用 DubboProtocol 的export() 方法时候,会进行 openServer(url)操作;

     openServer(url);
    private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client 也可以暴露一个只有server可以调用的服务。
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                } else {
                    //server支持reset,配合override功能使用
                    server.reset(url);
                }
            }
        }

    检查是否开启过服务,没有的话,createServer(url);创建服务,这里大家就往下面点,会经过如下的过程:

     

    会使用dubbo SPI 机制返回Transporter 传输对象,在我们不在xml 做任何配置的情况下,默认使用的是netty,我们可以看Javassist 生成的Transporter Adapter 源码:

    package com.alibaba.dubbo.remoting;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
    public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg0;
    String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
    com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
    return extension.connect(arg0, arg1);
    }
    public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg0;
    String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
    com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
    return extension.bind(arg0, arg1);
    }
    }

    如果使用mina ,我们可以在xml 做如下配置:

        <!-- 用dubbo协议在20880端口暴露服务 -->
        <dubbo:protocol name="dubbo" port="20881" transporter="mina" />

    在实例化的时候会进行doOpen() 操作了,就是正常的netty api 流程了; 

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory);
            
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
            // https://issues.jboss.org/browse/NETTY-365
            // https://issues.jboss.org/browse/NETTY-379
            // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    /*int idleTimeout = getIdleTimeout();
                    if (idleTimeout > 10000) {
                        pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                    }*/
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
        }
    通过分析,netty 服务创建流程就是spi 的各种调用,所以大家了解dubbo 就必须了解它的spi 机制;
    这样,一个netty 服务就启动了.. 

     (五) 注册中心注册服务

    注册中心我们以zookeeper 为例,也是Dubbo 推荐使用的注册中心以及生产部署中常用的;

    注册中心的实现是 protocol 执行链中的RegistryProtocol 类中的export 方法

    实现主要是CURATOR 框架进行节点的创建以及监听

    我们看节点的创建过程:

     递归创建节点,其中url地址是临时的节点,客户端断开会消失,其他的像/dubbo/接口地址 是永久的节点:如下图

     

  • 相关阅读:
    oracle使用expdp备份数据库
    用Setuptools构建和分发程序包
    C#5.0-原生异步编程方式
    任务并行库
    线程-线程池1
    多线程-3(同步)
    多线程-2(线程同步)
    线程---1
    高性能-GC3
    高性能-GC2
  • 原文地址:https://www.cnblogs.com/iscys/p/10177089.html
Copyright © 2011-2022 走看看