zoukankan      html  css  js  c++  java
  • [编织消息框架][netty源码分析]8 Channel 实现类NioSocketChannel职责与实现

    Unsafe是托委访问socket,那么Channel是直接提供给开发者使用的

    Channel 主要有两个实现 NioServerSocketChannel同NioSocketChannel 致于其它不常用不在研究范围内

    NioServerSocketChannel 是给server用的,程序由始至终只有一个NioServerSocketChannel

    NioSocketChannel 是给客户端用的,每个连接生成一个NioSocketChannel 对象

    NioSocketChannel同NioSocketChannel的继承关系

    NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel

    NioServerSocketChannel -> AbstractNioMessageChannel-> AbstractNioChannel -> AbstractChannel

    小提示:如果看文字不够直观可以在eclipse里按快捷键 选择类 ctrl+t 

    channel有unsafe相应的实现类,反之亦是。其实功能是很简单的,划分太多对象目的是对某部分功能重用,有时也可能因过渡设计造成

    对于channel我们主要分析 I/O read/write操作

    public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
        private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    
        //构造时就绑定SelectorProvider,然后注册OP_ACCEPT
        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
        
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
        
     
        /**
        server read操作对应的为readMessages
        参数是个数组,是C语言书写风格,如果需要返回多种类型数据,那么传个对象进去外部就能获取到
        这里比较重要,当有接收到socket时,生成NioSocketChannel对象
       读者如果还有印象的话在讲NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在这方法打上断点观察
    */ @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //生成NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ch.close(); } return 0; } //server 应该没有write操作才对,因为server是一对多处理,不知道发给那一个clinet @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception {} }
    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
        public NioSocketChannel(Channel parent, SocketChannel socket) {
            super(parent, socket);
            config = new NioSocketChannelConfig(this, socket.socket());
        }
        
        //////////////////////////////这部分是unsafe底层调用上层的实现//////////////////////////////////////////////
        @Override
        protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            //这里设置byteBuf写入数据坐标
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
        }
    
        @Override
        protected int doWriteBytes(ByteBuf buf) throws Exception {
            final int expectedWrittenBytes = buf.readableBytes();
            return buf.readBytes(javaChannel(), expectedWrittenBytes);
        }
    
        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            for (;;) {
                int size = in.size();
                //没有数据退出
                if (size == 0) {
                    clearOpWrite();
                    break;
                }
                
                long writtenBytes = 0;    //记录写数据size
                boolean done = false;    //是否完成
                boolean setOpWrite = false;
    
    
                ByteBuffer[] nioBuffers = in.nioBuffers();
                int nioBufferCnt = in.nioBufferCount();
                long expectedWrittenBytes = in.nioBufferSize();
                SocketChannel ch = javaChannel();
    
                //这里有三种分支处理
                //如果没有ByteBuffer 有可能只发送几个byte
                //1跟default逻辑其实是一样的
                switch (nioBufferCnt) {
                    case 0:
                        //调用父类 AbstractNioByteChannel doWrite,逻辑基本相同,不同的是AbstractNioByteChannel处理的是byte 实现调用的是 doWriteBytes(ByteBuf buf)方法。。。
                        super.doWrite(in);
                        return;
                    case 1:
                        //这里只循环16次,可以看出是复制下面代码的哈。。。
                        ByteBuffer nioBuffer = nioBuffers[0];
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final int localWrittenBytes = ch.write(nioBuffer);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                    default:
                        //多个ByteBuffer时跟上面逻辑一样
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                }
    
                // Release the fully written buffers, and update the indexes of the partially written buffer.
                in.removeBytes(writtenBytes);
    
                if (!done) {
                    // Did not write all buffers completely.
                    incompleteWrite(setOpWrite);
                    break;
                }
            }
        }
    }
    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        //生成NioSocketChannel时就绑定 unsafe pipeline
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    }
    protected abstract class AbstractUnsafe implements Unsafe {
        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                // doRegister 是调用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                neverRegistered = false;
                registered = true;
                //这里是添加 Handler 每个Handler会生成一个Context
                pipeline.invokeHandlerAddedIfNeeded();
    
                safeSetSuccess(promise);
                //通知Handler Registered
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    if (firstRegistration) {
                        //通知Handler Active
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                //.......
            }
        }
    }

    小结:看似很复杂的Channel实现其实没想象难,大多数读写坐标记录交给ByteBuf处理掉了

    1.server每个client连接转换成NioSocketChannel对象

    2.构建NioSocketChannel时就已经生成 unsafe、pipeline

  • 相关阅读:
    668. Kth Smallest Number in Multiplication Table
    658. Find K Closest Elements
    483. Smallest Good Base
    475. Heaters
    454. 4Sum II
    441. Arranging Coins
    436. Find Right Interval
    410. Split Array Largest Sum
    392. Is Subsequence
    378. Kth Smallest Element in a Sorted Matrix
  • 原文地址:https://www.cnblogs.com/solq111/p/7066208.html
Copyright © 2011-2022 走看看