zoukankan      html  css  js  c++  java
  • Netty源码分析第5章(ByteBuf)---->第10节: SocketChannel读取数据过程

     

    Netty源码分析第五章: ByteBuf

     

    第十节: SocketChannel读取数据过程

    我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, Server读取数据的流程:

    首先温馨提示, 这一小节高度耦合第三章的第1, 2节的内容, 很多知识这里并不会重复讲解, 如果对之前的知识印象不深刻建议恶补第三章的第1, 2节的内容之后再学习这一小节

    我们首先看NioEventLoop的processSelectedKey方法:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //获取到channel中的unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        //如果这个key不是合法的, 说明这个channel可能有问题
        if (!k.isValid()) {
            //代码省略
        }
        try {
            //如果是合法的, 拿到key的io事件
            int readyOps = k.readyOps();
            //链接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            //写事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            //读事件和接受链接事件
            //如果当前NioEventLoop是work线程的话, 这里就是op_read事件
            //如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

     if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 这里的判断表示轮询到大事件是op_read或者op_accept事件

    之前的章节分析过, 如果当前NioEventLoop是work线程的话, 那么这里就是op_read事件, 也就是读事件, 表示客户端发来了数据流

    这里会调用unsafe的redis()方法进行读取

    如果是work线程, 那么这里的channel是NioServerSocketChannel, 其绑定的unsafe是NioByteUnsafe, 这里会走进NioByteUnsafe的read()方法中:

    public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);
    
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }
    
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());
    
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
    
                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

    首先获取SocketChannel的config, pipeline等相关属性

     final ByteBufAllocator allocator = config.getAllocator(); 这一步是获取一个ByteBuf的内存分配器, 用于分配ByteBuf

    这里会走到DefaultChannelConfig的getAllocator方法中:

    public ByteBufAllocator getAllocator() {
        return allocator;
    }

    这里返回的DefualtChannelConfig的成员变量, 我们看这个成员变量:

    private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

    这里调用ByteBufAllocator的属性DEFAULT, 跟进去:

    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

    我们看到这里又调用了ByteBufUtil的静态属性DEFAULT_ALLOCATOR, 再跟进去:

    static final ByteBufAllocator DEFAULT_ALLOCATOR;

    DEFAULT_ALLOCATOR这个属性是在static块中初始化的

    我们跟到static块中:

    static {
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();
    
        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }
        DEFAULT_ALLOCATOR = alloc;
        //代码省略
    }

    首先判断运行环境是不是安卓, 如果不是安卓, 在返回"pooled"字符串保存在allocType中

    然后通过if判断, 最后局部变量alloc = PooledByteBufAllocator.DEFAULT, 最后将alloc赋值到成员变量DEFAULT_ALLOCATOR

    我们跟到PooledByteBufAllocator的DEFAULT属性中:

    public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

    我们看到这里直接通过new的方式, 创建了一个PooledByteBufAllocator对象, 也就是基于申请一块连续内存进行缓冲区分配的缓冲区分配器

    缓冲区分配器的知识, 我们之前小节进行了详细的剖析, 这里就不再赘述

    回到NioByteUnsafe的read()方法中:

    public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);
    
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }
    
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());
    
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
    
                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

    这里 ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator

     final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle()  是创建一个handle, 我们之前的章节讲过, handle是对RecvByteBufAllocator进行实际操作的对象

    我们跟进recvBufAllocHandle:

    public RecvByteBufAllocator.Handle recvBufAllocHandle() {
        //如果不存在, 则创建一个handle的实例
        if (recvHandle == null) {
            recvHandle = config().getRecvByteBufAllocator().newHandle();
        }
        return recvHandle;
    }

    这里是我们之前剖析过的逻辑, 如果不存在, 则创建handle的实例, 具体创建过程我们可以回顾第三章的第二小节, 这里就不再赘述

    同样allocHandle.reset(config)是将配置重置, 第三章的第二小节也对其进行过剖析

    重置完配置之后, 进行do-while循环, 有关循环终止条件allocHandle.continueReading(), 之前小节也有过详细剖析, 这里也不再赘述

    在do-while循环中, 首先看 byteBuf = allocHandle.allocate(allocator) 这一步, 这里传入了刚才创建的allocate对象, 也就是PooledByteBufAllocator:

    这里会跑到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:

    public ByteBuf allocate(ByteBufAllocator alloc) {
        return alloc.ioBuffer(guess());
    }

    这里的guess方法, 会调用AdaptiveRecvByteBufAllocator的guess方法:

    public int guess() {
        return nextReceiveBufferSize;
    }

    这里会返回AdaptiveRecvByteBufAllocator的成员变量nextReceiveBufferSize, 也就是下次所分配缓冲区的大小, 根据我们之前学习的内容, 第一次分配的时候会分配初始大小, 也就是1024字节

    回到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:

    这样, alloc.ioBuffer(guess())就会分配一个PooledByteBuf

    我们跟到AbstractByteBufAllocator的ioBuffer方法中:

    public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

    这里首先判断是否能获取jdk的unsafe对象, 默认为true, 所以会走到directBuffer(initialCapacity)中, 这里最终会分配一个PooledUnsafeDirectByteBuf对象, 具体分配流程我们再之前小节做过详细剖析

    回到NioByteUnsafe的read()方法中:

    分配完了ByteBuf之后, 再看这一步allocHandle.lastBytesRead(doReadBytes(byteBuf)):

    首先看参数doReadBytes(byteBuf)方法, 这步是将channel中的数据读取到我们刚分配的ByteBuf中, 并返回读取到的字节数

    这里会调用到NioSocketChannel的doReadBytes方法:

    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

    首先拿到绑定在channel中的handler, 因为我们已经创建了handle, 所以这里会直接拿到

    再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())这步, byteBuf.writableBytes()返回byteBuf的可写字节数, 也就是最多能从channel中读取多少字节写到ByteBuf, allocate的attemptedBytesRead会把可写字节数设置到DefaultMaxMessagesRecvByteBufAllocator 类的attemptedBytesRead属性中

    跟到DefaultMaxMessagesRecvByteBufAllocator中的attemptedBytesRead我们会看到:

    public void attemptedBytesRead(int bytes) {
        attemptedBytesRead = bytes;
    }

     

    继续看doReadBytes方法:

    最后, 通过byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())将jdk底层的channel中的数据写入到我们创建的ByteBuf中, 并返回实际写入的字节数

    回到NioByteUnsafe的read()方法中:

    继续看allocHandle.lastBytesRead(doReadBytes(byteBuf))这步

    刚才我们剖析过doReadBytes(byteBuf)返回的是世界写入ByteBuf的字节数

    再看lastBytesRead方法, 跟到DefaultMaxMessagesRecvByteBufAllocator的lastBytesRead方法中:

    public final void lastBytesRead(int bytes) {
        lastBytesRead = bytes;
        totalBytesRead += bytes;
        if (totalBytesRead < 0) {
            totalBytesRead = Integer.MAX_VALUE;
        }
    }

    这里会赋值两个属性, lastBytesRead代表最后读取的字节数, 这里赋值为我们刚才写入ByteBuf的字节数, totalBytesRead表示总共读取的字节数, 这里将写入的字节数追加

    继续看NioByteUnsafe的read()方法:

    如果最后一次读取数据为0, 说明已经将channel中的数据全部读取完毕, 将新创建的ByteBuf释放循环利用, 并跳出循环

    allocHandle.incMessagesRead(1)这步是增加消息的读取次数, 因为我们循环最多16次, 所以当增加消息次数增加到16会结束循环

    读取完毕之后, 会通过pipeline.fireChannelRead(byteBuf)将传递channelRead事件, 有关channelRead事件, 我们在第四章也进行了详细的剖析

    这里读者会有疑问, 如果一次读取不完, 就传递channelRead事件, 那么server接收到的数据有可能就是不完整的, 其实关于这点, netty也做了相应的处理, 我们会在之后的章节详细剖析netty的半包处理机制

     

    循环结束后, 会执行到allocHandle.readComplete()这一步

    我们知道第一次分配ByteBuf的初始容量是1024, 但是初始容量不一定一定满足所有的业务场景, netty中, 将每次读取数据的字节数进行记录, 然后之后次分配ByteBuf的时候, 容量会尽可能的符合业务场景所需要大小, 具体实现方式, 就是在readComplete()这一步体现的

    我们跟到AdaptiveRecvByteBufAllocator的readComplete()方法中:

    public void readComplete() {
        record(totalBytesRead());
    }

    这里调用了record方法, 并且传入了这一次所读取的字节总数

    跟到record方法中:

    private void record(int actualReadBytes) { 
        if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { 
            if (decreaseNow) { 
                index = Math.max(index - INDEX_DECREMENT, minIndex); 
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            } else {
                decreaseNow = true;
            }
        } else if (actualReadBytes >= nextReceiveBufferSize) { 
            index = Math.min(index + INDEX_INCREMENT, maxIndex); 
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        }
    }

    首先看判断条件 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) 

    这里index是当前分配的缓冲区大小所在的SIZE_TABLE中的索引, 将这个索引进行缩进, 然后根据缩进后的所以找出SIZE_TABLE中所存储的内存值, 再判断是否大于等于这次读取的最大字节数, 如果条件成立, 说明分配的内存过大, 需要缩容操作, 我们看if块中缩容相关的逻辑

    首先 if (decreaseNow) 会判断是否立刻进行收缩操作, 通常第一次不会进行收缩操作, 然后会将decreaseNow设置为true, 代表下一次直接进行收缩操作

    假设需要立刻进行收缩操作, 我们看收缩操作的相关逻辑:

     index = Math.max(index - INDEX_DECREMENT, minIndex) 这一步将索引缩进一步, 但不能小于最小索引值

    然后通过 nextReceiveBufferSize = SIZE_TABLE[index] 获取设置索引之后的内存, 赋值在nextReceiveBufferSize, 也就是下次需要分配的大小, 下次就会根据这个大小分配ByteBuf了, 这样就实现了缩容操作

    再看 else if (actualReadBytes >= nextReceiveBufferSize) 

    这里判断这次读取字节的总量比上次分配的大小还要大, 则进行扩容操作

    扩容操作也很简单, 索引步进, 然后拿到步进后的索引所对应的内存值, 作为下次所需要分配的大小

    NioByteUnsafe的read()方法中:

    经过了缩容或者扩容操作之后, 通过pipeline.fireChannelReadComplete()传播ChannelReadComplete()事件

    以上就是读取客户端消息的相关流程

     

     

    第五章总结

     

            本章主要剖析了ByteBuf的基本操作以及缓冲区分配等相关知识.

     

            缓冲区分配, 分为通过调用jdk的api的方式和分配一块连续内存的方式

     

            其中, 通过分配连续内存的方式分配缓冲区中, 又介绍了在page级别分配的逻辑和在subpage级别分配的逻辑

     

            page级别分配时通过操作内存二叉树的方式记录分配情况

     

            subpage级别分配是通过位图的方式记录分配情况

     

            最后介绍了NioSocketChannel处理读事件的相关逻辑

     

            总体来说, 这一章的内容难度是比较大的, 希望同学课后通过多调试的方式进行熟练掌握

     

     上一节: ByteBuf的回收

     下一节: ByteToMessageDecoder

     

  • 相关阅读:
    数据库用户管理
    数据库(视图,事件,触发器,函数,存储,变量)
    我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=3cp8ng15g94wc
    Python3安装与使用urllib2包之小坑
    移动端的vw px rem之间换算
    H5移动端开发
    实现拖拽复制和可排序的react.js组件
    vue2实现搜索结果中的搜索关键字高亮
    来,了解一下Java内存模型(JMM)
    video标签学习使用
  • 原文地址:https://www.cnblogs.com/xiangnan6122/p/10205889.html
Copyright © 2011-2022 走看看