接口定义
@SPI("netty") //缺省值netty public interface Transporter { /** * Bind a server. * * @param url erver url * @param handler * @return server * @throws RemotingException * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, ChannelHandler...) *url参数 含有server 或者transporter */ @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException; /** * Connect to a server. * * @param url server url * @param handler * @return client * @throws RemotingException * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, ChannelHandler...) * url参数含有server 或者transporter */ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException; }
类图
说明
接上一篇《dubbo源码阅读-远程暴露(七)之Exchangers》
@Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //1. handler 会再次经过2层包装,增加功能 ChannelHandler h = new DecodeHandler(new HeaderExchangeHandler(handler)); //<1>2. Transports 操作会启动netty 监听端口,配置序列化实现, Server s = Transporters.bind(url,h); /** * <6>HeaderExchangeServer: 会对nettyServer 进行包装, 主要增加2个功能: * * a. 对channel进行 空闲时间检测,超过则关闭连接,节省资源。 * * b. 如果server关闭,则发送消息给client端,不再发送请求到该server。 */ return new HeaderExchangeServer( s); }
Transporters
<1>bind
com.alibaba.dubbo.remoting.Transporters#bind(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.ChannelHandler...)
/** * @param url * @param handlers 经过DecodeHandler->HeaderExchangeHandler->com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler逐级装饰 * @return * @throws RemotingException */ public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { //没做特殊处理传入的是一个 handler = handlers[0]; } else { //如果传入多个handles 通过ChannelHandlerDispatcher包装 内部通过包装使每个Handle都能监听各个事件 handler = new ChannelHandlerDispatcher(handlers); } /** * <2>getTransporter * <3>bind */ return getTransporter().bind(url, handler); }
<2>getTransporter
com.alibaba.dubbo.remoting.Transporters#getTransporter
public static Transporter getTransporter() { //SPI扩展点 缺省值是 @SPI("netty") return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
NettyTransporter
/** * 经过DecodeHandler->HeaderExchangeHandler->com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler逐级装饰 * @param url erver url * @param listener * @return * @throws RemotingException */ @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { //<3>创建一个nettyServer 此处handle是 return new NettyServer(url, listener); }
<3>NettyServer
com.alibaba.dubbo.remoting.transport.netty4.NettyServer#NettyServer
/** * <4>ChannelHandlers.wrap * <6>super * @param url * @param handler * @throws RemotingException */ public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
<4>ChannelHandlers.wrap
public static ChannelHandler wrap(ChannelHandler handler, URL url) { //<5>打包 return ChannelHandlers.getInstance().wrapInternal(handler, url); }
<5>wrapInternal
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { //将handler经过MultiMessageHandler->HeartbeatHandler->SPI获取Dispatcher默认是ALL 进行装饰 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
<7>doOpen
com.alibaba.dubbo.remoting.transport.netty4.NettyServer#doOpen
@Override protected void doOpen() throws Throwable { /** * boss线程,主要监听端口和获取worker线程及分配socketChannel给worker * NamedThreadFactory自定义线程工厂 并设置线程为守护线程 主线程关闭 子线程自动关闭 */ bootstrap = new ServerBootstrap(); // worker线程负责数据读写 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); //创建niosocket工厂 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); /** * NettyServerHandler->NettyServer->...-ProtocolRequestHandle * f * */ final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); //参数讲解:https://blog.csdn.net/zhongzunfa/article/details/94590670 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)//tcp协议 禁用Nagle 算法 .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)//tcp协议 允许通一个端口可以绑定到多个套接字 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//tcp协议 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder())//解码器 .addLast("encoder", adapter.getEncoder())//编码器 .addLast("handler", nettyServerHandler);//处理器 } }); // 启动服务 ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
<6>AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); /** * 从url获取绑定ip和端口如:dubbo://127.0.0.1:23888.... * localAddress例子:/192.168.2.1:23888 * 成员变量 */ localAddress = getUrl().toInetSocketAddress(); //获取绑定id优先从url参数bind.ip获取 获取不到取url里面封装的 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); //获取绑定端口 优先从url bind.port 获取获取不到取url封装的 int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); //anyhost是否有取到绑定ip 参见:https://www.cnblogs.com/LQBlog/p/12469007.html#autoid-6-12-0 if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } //初始化绑定address 成员变量 bindAddress = new InetSocketAddress(bindIp, bindPort); /** * 获取url accepts 控制客户端连接服务端的连接数控制 默认0 * 文档地址:http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-protocol.html * 成员变量 */ this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); /** * 获取url参数 idle.timeout 空闲超时时间,单位:毫秒 * 成员变量 */ this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { //<7>开启服务器 模板方法模式 由子类实现 doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //获得线程池 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }