zoukankan      html  css  js  c++  java
  • [编织消息框架][netty源码分析]8 Channel 实现类NioSocketChannel职责与实现

    Unsafe是托委访问socket,那么Channel是直接提供给开发者使用的

    Channel 主要有两个实现 NioServerSocketChannel同NioSocketChannel 致于其它不常用不在研究范围内

    NioServerSocketChannel 是给server用的,程序由始至终只有一个NioServerSocketChannel

    NioSocketChannel 是给客户端用的,每个连接生成一个NioSocketChannel 对象

    NioSocketChannel同NioSocketChannel的继承关系

    NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel

    NioServerSocketChannel -> AbstractNioMessageChannel-> AbstractNioChannel -> AbstractChannel

    小提示:如果看文字不够直观可以在eclipse里按快捷键 选择类 ctrl+t 

    channel有unsafe相应的实现类,反之亦是。其实功能是很简单的,划分太多对象目的是对某部分功能重用,有时也可能因过渡设计造成

    对于channel我们主要分析 I/O read/write操作

    public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
        private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    
        //构造时就绑定SelectorProvider,然后注册OP_ACCEPT
        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
        
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
        
     
        /**
        server read操作对应的为readMessages
        参数是个数组,是C语言书写风格,如果需要返回多种类型数据,那么传个对象进去外部就能获取到
        这里比较重要,当有接收到socket时,生成NioSocketChannel对象
       读者如果还有印象的话在讲NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在这方法打上断点观察
    */ @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //生成NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ch.close(); } return 0; } //server 应该没有write操作才对,因为server是一对多处理,不知道发给那一个clinet @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception {} }
    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
        public NioSocketChannel(Channel parent, SocketChannel socket) {
            super(parent, socket);
            config = new NioSocketChannelConfig(this, socket.socket());
        }
        
        //////////////////////////////这部分是unsafe底层调用上层的实现//////////////////////////////////////////////
        @Override
        protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            //这里设置byteBuf写入数据坐标
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
        }
    
        @Override
        protected int doWriteBytes(ByteBuf buf) throws Exception {
            final int expectedWrittenBytes = buf.readableBytes();
            return buf.readBytes(javaChannel(), expectedWrittenBytes);
        }
    
        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            for (;;) {
                int size = in.size();
                //没有数据退出
                if (size == 0) {
                    clearOpWrite();
                    break;
                }
                
                long writtenBytes = 0;    //记录写数据size
                boolean done = false;    //是否完成
                boolean setOpWrite = false;
    
    
                ByteBuffer[] nioBuffers = in.nioBuffers();
                int nioBufferCnt = in.nioBufferCount();
                long expectedWrittenBytes = in.nioBufferSize();
                SocketChannel ch = javaChannel();
    
                //这里有三种分支处理
                //如果没有ByteBuffer 有可能只发送几个byte
                //1跟default逻辑其实是一样的
                switch (nioBufferCnt) {
                    case 0:
                        //调用父类 AbstractNioByteChannel doWrite,逻辑基本相同,不同的是AbstractNioByteChannel处理的是byte 实现调用的是 doWriteBytes(ByteBuf buf)方法。。。
                        super.doWrite(in);
                        return;
                    case 1:
                        //这里只循环16次,可以看出是复制下面代码的哈。。。
                        ByteBuffer nioBuffer = nioBuffers[0];
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final int localWrittenBytes = ch.write(nioBuffer);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                    default:
                        //多个ByteBuffer时跟上面逻辑一样
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                }
    
                // Release the fully written buffers, and update the indexes of the partially written buffer.
                in.removeBytes(writtenBytes);
    
                if (!done) {
                    // Did not write all buffers completely.
                    incompleteWrite(setOpWrite);
                    break;
                }
            }
        }
    }
    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        //生成NioSocketChannel时就绑定 unsafe pipeline
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    }
    protected abstract class AbstractUnsafe implements Unsafe {
        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                // doRegister 是调用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                neverRegistered = false;
                registered = true;
                //这里是添加 Handler 每个Handler会生成一个Context
                pipeline.invokeHandlerAddedIfNeeded();
    
                safeSetSuccess(promise);
                //通知Handler Registered
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    if (firstRegistration) {
                        //通知Handler Active
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                //.......
            }
        }
    }

    小结:看似很复杂的Channel实现其实没想象难,大多数读写坐标记录交给ByteBuf处理掉了

    1.server每个client连接转换成NioSocketChannel对象

    2.构建NioSocketChannel时就已经生成 unsafe、pipeline

  • 相关阅读:
    记一次网络实践
    python中得公有和私有——私有函数和公开函数_补充完整
    机器学习 之LightGBM算法
    机器学习 之XGBoost算法
    机器学习 之梯度提升树GBDT
    机器学习 之KNN近邻法
    机器学习之 XGBoost和LightGBM
    《剑指offer》 之二叉树
    随机森林RF、XGBoost、GBDT和LightGBM的原理和区别
    机器学习之决策树_CART算法
  • 原文地址:https://www.cnblogs.com/solq111/p/7066208.html
Copyright © 2011-2022 走看看