在阅读此文章之前,我希望阅读者对Spring 扩展机制的有一定的了解,比如:自定义标签与Spring整合, InitializingBean 接口,ApplicationContextAware,BeanNameAware,
BeanFactory 接口所起到的作用 ;从来没了解过的,请看我之前的关于Spring的博客
关于此篇文章受益于 :http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html dubbo官网文档
开始正题
(一)onApplicationEvent (事件监听)
dubbo 服务导出的方法是在 com.alibaba.dubbo.config.spring.ServiceBean 类中,dubbo 服务的导出过程始于在Spring 容器发生刷新事件,那么如何感知到Spring 容器发生刷新
事件呢? ~~ 得益于Spring提供的 ApplicationListener 接口,看如下代码实现:
public void onApplicationEvent(ApplicationEvent event) {
//在Spring bean 刷新后进行服务的导出;
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && ! isExported() && ! isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
}
实现该接口需要实现 onApplicationEvent 方法;事件的监听,在有event 事件发生后,Spring会自动进行触发此方法;
在Spring 容器发生刷新事件后进行导出 export(); 这一步就是最先开始的地方;
(二)检查参数,组装 URL
public synchronized void export() {
if (provider != null) {
//是否进行导出的操作,用于在我们配置了<dubbo:provider export="false" />的时候,本地进行操作,不进行服务暴露的时候
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && ! export.booleanValue()) {
return;
}
//延时导出,线程睡眠,在高版本中,此方法改变成了schedule
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
}
以上的方法就是进行了<dubbo:provider> 标签属性 export 与delay 的操作判断;
我们继续进行 doExport();
doExport() 又进行了一些的判断与初始化的动作,没什么好说的,接下来就是调用 doExportUrls();
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
loadRegistries() 会将一些必要信息进行排序封装,生成url地址链接如下格式:
registry://47.95.**.138:2181/com.alibaba.dubbo.registry.RegistryService?application=hm-service-cart&check=false&dubbo=2.5.3&pid=931®istry=zookeeper×tamp=1545835371053
有多少协议,就导出多少次:
doExportUrlsFor1Protocol(protocolConfig, registryURLs);这个方法我们在第4步分析;
我们先看看Dubbo spi 机制
(三) Dubbo SPI (服务发现机制)
DUBBO SPI 的实现主要依赖于。 com.alibaba.dubbo.common.extension.ExtensionLoader 这个类,这个类是dubbo 运行的容器,维护着一些组件的实例,就像Spring 容器一样;
SPI 服务发现机制;我给大家揭开它的面纱:
在源码阅读中,我们只要发现了像如下的代码,内部就进行了spi 的机制
ExtensionLoader.getExtensionLoader(XXX.class).getExtension(type);
ExtensionLoader.getExtensionLoader(XXX.class).getAdaptiveExtension();
xxx.class 是接口名字,必须带有@SPI的注解
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { //不为空检测 if (type == null) throw new IllegalArgumentException("Extension type == null"); //必须的接口 if(!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } //接口必须有@SPI 的注解 if(!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } //实例化 ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; }
具体如下
1.(大家可以去看一看dubbo jar 包下 META-INF/dubbo/internal 目录的文件):以接口作为文件名称,内部是ke y = 实现类
private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
2.在 getExtension 方法执行中,主要看 createExtension(name)方法;
public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<Object>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; }
createExtension(name)方法(ioc + aop )
private T createExtension(String name) {
//这个方法会从文件系统中获取name 所对应的value ,大家可以看看这个方法 Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); }
//ioc 操作 injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses;
//aop 包装的操作,包装类你调用我我调用你,形成了一条执行链;wrapperClasses 是包装类,在loadFile() 方法中被装配 if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
获取所有的扩展类:
private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
// 此方法已经getExtensionClasses方法同步过。 private Map<String, Class<?>> loadExtensionClasses() { final SPI defaultAnnotation = type.getAnnotation(SPI.class); if(defaultAnnotation != null) { String value = defaultAnnotation.value(); if(value != null && (value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if(names.length > 1) { throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if(names.length == 1) cachedDefaultName = names[0]; } } Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
//从META-INF/services/ META-INF/dubbo/ /META-INF/dubbo/internal/ 这三个文件夹中获取class配置 loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY); return extensionClasses; }
//dubbo 扩展检索的文件
private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
这样对应着key 就可以实例化value,找到实现类了; 大家可以做一做小的demo进行一下测试:
/** 接口 * dubbo spi 机制,需要引用dubbo的spi 注解标签 */ @SPI public interface DubboSPI { public void sayHello(); } //实现类 public class DuboSPIImpl implements DubboSPI { public void sayHello(){ System.out.println("dubbo spi 机制运行......cys"); }; } //测试类 public static void main(String[] args) { ExtensionLoader<DubboSPI> extensionLoader = ExtensionLoader.getExtensionLoader(DubboSPI.class); DubboSPI optimusPrime = extensionLoader.getExtension("DubboSPI"); optimusPrime.sayHello(); }
在META-INF/dubbo 下配置创建 com.baseknow.spi.dubbospi.DubboSPI (你的接口名字)文件
DubboSPI = com.baseknow.spi.dubbospi.DuboSPIImpl
关于一些常规的操作就到这里;
高级特性:(动态生成字节码文件,加载进入内存)
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
如上:dubbo 通过 getAdaptiveExtension()进行扩展适配;通过javassist 将动态创建的类加载进内存;可以这么说,如上的protocol proxyFactory 这两个对象是不存在的
通过dubbo 扩展 SPI Adapter 机制,将会自动生成实现该接口的一个类(这个类是自动生成的,不存在我们的文件中),通过javassiste 进行记载,然后通过创建的Adapter 类进行方法
的调度;
具体请看 ExtensionLoader 类如下两个方法(源码生成➕编译):
private Class<?> createAdaptiveExtensionClass() { //生成源码code ,建议debug ,看一看生成的源码是什么样子的 String code = createAdaptiveExtensionClassCode(); ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); //默认会进行javassist 将源码加载toclass() return compiler.compile(code, classLoader); } //源码生成的方法 private String createAdaptiveExtensionClassCode() { StringBuilder codeBuidler = new StringBuilder(); Method[] methods = type.getMethods(); boolean hasAdaptiveAnnotation = false; for(Method m : methods) { if(m.isAnnotationPresent(Adaptive.class)) { hasAdaptiveAnnotation = true; break; } } //........
如下是dubbo 通过 getAdaptiveExtension() 通过一系列判断 stringbbuilder 拼接而生成的 实现了Protocol接口的 适配器源码,根据传来的url来处理不同协议的调度
package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } }
这是实现了ProxyFactory 接口的 适配器源码
package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); }
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } }
以上两个类就是生成的源码code;
通过Javassist 进行将类进行加载成字节码 com.alibaba.dubbo.common.compiler.support.JavassistCompiler
总的来说就二步:
1.通过Dubbo 扩展机制 ExtensionLoader 生成一个接口的自定义实现类源码;
2.通过javassist 将生成的源码转换为字节码加载进入内存
关于Protocol 与 ProxyFactory 这两个动态的实现类,内部又是进行了spi机制的一些方法调用;大家看一看,因为下面的一些扩展都是基于这两个的,不然很难知道它进行调用了哪个实现类;当然还有 Transporter 接口实现等等也是该机制;
如下是Transport Adapter 源码:可以看出在没有指定传输的时候,默认使用的是netty
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.connect(arg0, arg1); } public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.bind(arg0, arg1); } }
关于高级特性就到这里,具体的还是要看getAdaptiveExtension() 方法实现,里面是进行了 StringBuilder append 拼接源码生成新的类,再进行JAVAssist toClass() 操作,需要对
JAVAssist 了解,不了解,大约知道生成的新类长什么样子就好了;
(四) Netty(mina) 服务的启动,端口绑定
紧接着第二步的步伐,我们分析 doExportUrlsFor1Protocol();我们抛开一些小细节,具体看如下Mark 红色的部分
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter);
proxyFactory 与 protocol 都是动态生成类,大家可以看第四部分我贴出来的动态生成的源码,就可以知道真正的实现类是哪一个(默认是javassist 的实现类);
protocol 则是进行了aop 式的包装类,会将DubboProtocol 类 ProtocolFilterWrapper 类, ProtocolListenerWrapper类,RegistryProtocol 类的export()方法执行一边,为什么都会执行呢,请大家看 ExtensionLoader 的 createExtension 方法;如下:
private T createExtension(String name) { //这个方法会从文件系统中获取name 所对应的value ,大家可以看看这个方法 Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } //ioc 操作 injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; //aop 包装的操作,包装类你调用我我调用你,形成了一条执行链; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
在调用 DubboProtocol 的export() 方法时候,会进行 openServer(url)操作;
openServer(url);
private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }
检查是否开启过服务,没有的话,createServer(url);创建服务,这里大家就往下面点,会经过如下的过程:
会使用dubbo SPI 机制返回Transporter 传输对象,在我们不在xml 做任何配置的情况下,默认使用的是netty,我们可以看Javassist 生成的Transporter Adapter 源码:
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.connect(arg0, arg1); } public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.bind(arg0, arg1); } }
如果使用mina ,我们可以在xml 做如下配置:
<!-- 用dubbo协议在20880端口暴露服务 --> <dubbo:protocol name="dubbo" port="20881" transporter="mina" />
在实例化的时候会进行doOpen() 操作了,就是正常的netty api 流程了;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
通过分析,netty 服务创建流程就是spi 的各种调用,所以大家了解dubbo 就必须了解它的spi 机制;
这样,一个netty 服务就启动了..
(五) 注册中心注册服务
注册中心我们以zookeeper 为例,也是Dubbo 推荐使用的注册中心以及生产部署中常用的;
注册中心的实现是 protocol 执行链中的RegistryProtocol 类中的export 方法
实现主要是CURATOR 框架进行节点的创建以及监听
我们看节点的创建过程:
递归创建节点,其中url地址是临时的节点,客户端断开会消失,其他的像/dubbo/接口地址 是永久的节点:如下图