Unsafe是托委访问socket,那么Channel是直接提供给开发者使用的
Channel 主要有两个实现 NioServerSocketChannel同NioSocketChannel 致于其它不常用不在研究范围内
NioServerSocketChannel 是给server用的,程序由始至终只有一个NioServerSocketChannel
NioSocketChannel 是给客户端用的,每个连接生成一个NioSocketChannel 对象
NioSocketChannel同NioSocketChannel的继承关系
NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel
NioServerSocketChannel -> AbstractNioMessageChannel-> AbstractNioChannel -> AbstractChannel
小提示:如果看文字不够直观可以在eclipse里按快捷键 选择类 ctrl+t
channel有unsafe相应的实现类,反之亦是。其实功能是很简单的,划分太多对象目的是对某部分功能重用,有时也可能因过渡设计造成
对于channel我们主要分析 I/O read/write操作
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); //构造时就绑定SelectorProvider,然后注册OP_ACCEPT public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } /** server read操作对应的为readMessages 参数是个数组,是C语言书写风格,如果需要返回多种类型数据,那么传个对象进去外部就能获取到 这里比较重要,当有接收到socket时,生成NioSocketChannel对象
读者如果还有印象的话在讲NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在这方法打上断点观察 */ @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //生成NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ch.close(); } return 0; } //server 应该没有write操作才对,因为server是一对多处理,不知道发给那一个clinet @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception {} }
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } //////////////////////////////这部分是unsafe底层调用上层的实现////////////////////////////////////////////// @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //这里设置byteBuf写入数据坐标 allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); } @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); //没有数据退出 if (size == 0) { clearOpWrite(); break; } long writtenBytes = 0; //记录写数据size boolean done = false; //是否完成 boolean setOpWrite = false; ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); //这里有三种分支处理 //如果没有ByteBuffer 有可能只发送几个byte //1跟default逻辑其实是一样的 switch (nioBufferCnt) { case 0: //调用父类 AbstractNioByteChannel doWrite,逻辑基本相同,不同的是AbstractNioByteChannel处理的是byte 实现调用的是 doWriteBytes(ByteBuf buf)方法。。。 super.doWrite(in); return; case 1: //这里只循环16次,可以看出是复制下面代码的哈。。。 ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: //多个ByteBuffer时跟上面逻辑一样 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } } }
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //生成NioSocketChannel时就绑定 unsafe pipeline protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } } protected abstract class AbstractUnsafe implements Unsafe { private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); // doRegister 是调用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this); neverRegistered = false; registered = true; //这里是添加 Handler 每个Handler会生成一个Context pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //通知Handler Registered pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //通知Handler Active pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //....... } } }
小结:看似很复杂的Channel实现其实没想象难,大多数读写坐标记录交给ByteBuf处理掉了
1.server每个client连接转换成NioSocketChannel对象
2.构建NioSocketChannel时就已经生成 unsafe、pipeline