问题
- channel 是如何处理发送一半中断后继续重发的
- channel 具体作用是什么
概述
这一节我们将介绍 Channel 和内部接口 Unsafe .其中Unsafe 是内部接口,聚合在Channel 中协助网络读写操作相关的操作,设计初衷就是 Channel 的内部辅助类,不应该被用户使用。
继承类分析
继承关系链 :
AbstractChannel -> AbstractNioChannel -> AbstractNioByteChannel -> NioSocketChannel 如下图 从以上的类结构我们也要学习一下类的构建,各个类实现应该实现的功能,最后生成的具体类具有不同的功能。 AbstractChannel ,保存以下重要的字段 ,主要 - EventLoop - localAddress - remoteAddress - unsafe - DefaultChannelPipleline - Future类 和 Promise类 等
AbstractNioChannel,从类名可以看出和nio 中 Channel 相关,注册,监听
private final SelectableChannel ch; protected final int readInterestOp; private volatile SelectionKey selectionKey; private volatile boolean inputShutdown; /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */ private ChannelPromise connectPromise; private ScheduledFuture<?> connectTimeoutFuture; private SocketAddress requestedRemoteAddress;
AbstractNioByteChannel 这个类是Channel对Byte进行操作,对ByteBuff的读写。
源码分析
AbstractChannel
AbstractChannel 的读写方法都是交由 ChannelPiple 来解决的
@Override public Channel read() { pipeline.read(); return this; } @Override public ChannelFuture write(Object msg) { return pipeline.write(msg); }
eventLoop方法,直接返回持有的 eventloop对象
@Override public EventLoop eventLoop() { return eventLoop; }
AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioChannel.class); // No.1 注册监听相关的字段 private final SelectableChannel ch; protected final int readInterestOp; private volatile SelectionKey selectionKey; private volatile boolean inputShutdown; // No.2 异步执行的字段,或是回调相关的字段 /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */ private ChannelPromise connectPromise; private ScheduledFuture<?> connectTimeoutFuture; private SocketAddress requestedRemoteAddress; ... //核心方法 @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //拿父类的channel对象(父类的channel对象是java原生channel 对象) selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } //开始read的操作 @Override protected void doBeginRead() throws Exception { if (inputShutdown) { return; } final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } //就是改变监听的事件 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
AbstractNioByteChannel
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; //循环 for (;;) { Object msg = in.current(true); if (msg == null) { // Wrote all messages.写完了(发送完了)所有的消息,清除标志,结束 clearOpWrite(); break; } if (msg instanceof ByteBuf) { //加入是ByteBuf类型 ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); //判断当前的可读字节是否为 0 ,为 0 丢弃掉 if (readableBytes == 0) { in.remove(); continue; } boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; //循环发送次数 if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { //doWriteBytes 子类实现 int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; //一直到不可读 if (!buf.isReadable()) { done = true; break; } } //发送完,更新发送的进度(有可能没发完) in.progress(flushedAmount); if (done) { in.remove(); } else { //没发完,设置写半包标识,启动刷新线程继续发送之前没有发送完成的半包消息 incompleteWrite(setOpWrite); break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } //循环发送 for (int i = writeSpinCount - 1; i >= 0; i --) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (region.transfered() >= region.count()) { done = true; break; } } //发送完(有可能发送了一半)更新进度 in.progress(flushedAmount); if (done) { in.remove(); } else { //没法完,创建一个任务扔到EventLoop incompleteWrite(setOpWrite); break; } } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); } } } //没写完(没发送完) protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { setOpWrite(); } else { // Schedule flush again later so other tasks can be picked up in the meantime //创建任务扔到 eventLoop执行 Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
循环发送次数是指一次发送没有完成时(写半包),程序就继续尝试循环写操作,此时IO线程是不能处理其他事件的,例如读新的消息或者执行定时任务和 NioTask 等, 如果网络IO阻塞或者对方接收消息太慢,可能会导致线程假死,于是就要循环发送。
AbstractNioMessageChannel
我们再来看一下AbstractNioChannel 的另外一个子类 AbstractNioMessageChannel,直接看doWrite方法
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { if (doWriteMessage(msg, in)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. //没发送完,设置标志,交给 select 多路复用器轮询对应的channel重新发送尚未发送完成的半包信息 if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } }
AbstractNioMessageChannel 和 AbstractNioByteChannel的区别在于
NioServerSocketChannel 和 NioServerChannel 的分析
NioSocketChannel 和 NioServerSocketChannel 的区别到底是什么?后者是服务端当中负责绑定端口,读取数据功能,连接和断开,写消息都不支持,这些功能都在NioSocketChannel中实现
AbstractNioMessageServerChannel 的具体子类是 NioServerSocketChannel(该类是服务器端接受处理客户端的channel),它的doReadMessages方法(被对应的unsafe类read方法,这里可能有点饶,具体看代码实现)分析如下
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //构建一个NioSocketChannel放进数组中 buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
NioServerChannel的源码分析
public class NioServerSocketChannel extends AbstractNioMessageServerChannel implements io.netty.channel.socket.ServerSocketChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); private static ServerSocketChannel newSocket() { try { return ServerSocketChannel.open(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } private final ServerSocketChannelConfig config; /** * Create a new instance */ public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) { super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(this, javaChannel().socket()); } @Override public InetSocketAddress localAddress() { return (InetSocketAddress) super.localAddress(); } @Override public ChannelMetadata metadata() { return METADATA; } @Override public ServerSocketChannelConfig config() { return config; } @Override public boolean isActive() { return javaChannel().socket().isBound(); } @Override public InetSocketAddress remoteAddress() { return null; } @Override protected ServerSocketChannel javaChannel() { return (ServerSocketChannel) super.javaChannel(); } @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); } @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); } @Override protected void doClose() throws Exception { javaChannel().close(); } @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } // Unnecessary stuff @Override protected boolean doConnect( SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { throw new UnsupportedOperationException(); } @Override protected void doFinishConnect() throws Exception { throw new UnsupportedOperationException(); } @Override protected SocketAddress remoteAddress0() { return null; } @Override protected void doDisconnect() throws Exception { throw new UnsupportedOperationException(); } @Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } }
可以看到 NioServerChannel 的主要都是 override 父类的方法,即是说大部分的逻辑都在父类 Abstract中进行了一层层的封装,给我们一个启发,好的类结构在
在一开始就已经设计好,最终的具体实现交由尾端实现。
总结
本文介绍了channel的主要功能作用。
参考资料
- 《Netty权威指南》