zoukankan      html  css  js  c++  java
  • Netty

    1. 学习目的

    1. BIO、NIO和AIO的区别?
    2. NIO的组成?
    3. Netty的特点?
    4. Netty的线程模型?
    5. TCP 粘包/拆包的原因及解决方法?
    6. 了解哪几种序列化协议?
    7. 如何选择序列化协议?
    8. Netty的零拷贝实现?
    9. Netty的高性能表现在哪些方面?
    10. NIOEventLoopGroup源码?

    2.学习Netty

    1. 为什么学习Netty
      分布式的时代 分布式框架中要用到
      Spring5 去servlet化,底层使用netty
      SpringBoot 内部实现web容器
      Zookeeper 底层通信
      Dubbo 分布式服务框架,多协议支持(RPC) Netty
      为成为架构师筑基
    2. Netty能帮助我们解决什么问题?
      框架:简化开发一系列解决方案的集合
      封装IO操作的框架
      复杂的业务场景中,没有说用一个单独的IO APi,经常遇到的问题:手写线程(Thread),多线程处理、性能问题、单双工,影响业务的开发。
      IO + 多线程来解决问题
      类似的框架:Mina Netty的前身(不是一个人开发的)
    3. 为什么要封装IO操作
      阻塞和非阻塞:
      自己回答:处理端在处理数据的时候,如果被处理对象的数据没有准备好,线程会阻塞在当前线程,譬如socket.accept()方法 ,在等待网络连接的时候,如果没有请求连接,线程会处于阻塞状态,如果是非阻塞模型,当前线程可以去处理其他事物

    Input Output
    是相对于内存而言
    磁盘只是input output的一端,除了磁盘还有网络

    IO模型

    阻塞非阻塞:参照IO操作 读数据或取数据时候的一种处理机制

    BIO阻塞:

    NIO:
    通道 Buffer 轮询

    同步与异步

    在处理数据的时候,在同一时间点能同时做多个处理:异步
    在同一时间只能做一个处理:同步

    • 多路复用体现:
      ServerSocketChannel
      SocketChannle (从多路复用器中拿到客户端的引用)
      都来自于 (SelectionKey)key.channel

    NIO的操作过于繁琐,于是有了Netty,对NIO进行了封装。

    IO(BIO)Block IO 同步阻塞IO
    NIO Non-Block IO 同步非阻塞IO (可以使用线程池的方式,实现异步)
    AIO Async IO 异步非阻塞IO (事件驱动,回调实现异步)

    3. NIO

    3.1 NIO的核心组件(三件套)

    1. Buffer、Selector、Channel、

    3.1.1 Buffer

    1. 缓冲区属性
      容量:capacity:表示该缓冲区可以保存多少数据。
      极限:limit:表示缓冲区的当前终点,不能对缓冲区中超过极限的数据进行读写操作。极限可以修改,有利于缓冲区的重用。
      位置:position:表示缓冲区中下一个读写单元的位置,每次读写缓冲区的数据时,都会改变位置值,为下一次读写数据做准备。
      属性关系为 capacity>=limit>=position>=0
    2. 改变缓冲区3个属性的方法
      clear():把极限限制为容量值,再把位置设为0.
      flip():把极限值设为位置值,再把位置设为0.
      rewind():不改变极限,把位置设为0.
    3. ByteBuffer类没有提供公开的构造方法,但是提供了两个获得ByteBuffer实例的静态工厂方法。
      allocate(int capacity):返回一个ByteBuffer对象
      directAllocate(int capacity):返回一个ByteBuffer对象,称为直接缓冲区
    4. 所有的缓冲区都提供了读写缓冲区的方法
      get():相对读。从缓冲区的当前位置读取一个单元的数据,读完后把位置值加1。
      get(index):决对读。从参数index指定的位置读取一个单元的数据。
      put():相对写。向缓冲区的当前位置写入一个单元的数据,写完后把位置值加1.
      put(int index):绝对写。向参数index指定的位置写入一个单元的数据。
    • buffer操作的流程

    3.1.2 Selector

    3.1.3 Channel

    是多路复用的基础
    解决的问题:

    1. 请求过多,导致连接线程过多
    2. 处理数据较大,导致线程长时间占用 也会导致阻塞
      selector模型,IO调用不会被阻塞,

    3.2何为多路复用

    3.3Netty支持的功能与特性

    Netty

    Netty封装了NIO,Reactor模型,声明BOSSS线程,worker线程,可以修改BOSS线程和worker线程的数量

    1. 编码过程
      //Netty服务 //NIO //BIO
      ServerBootstrap ServerSocketChannel ServerSocket

    主线程处理类,这样的写法,底层使用反射

    子线程处理类 ,Handler

    无锁化串行编程

    编码器

    解码器

    业务处理逻辑

    针对主线程的最大配置数

    针对子线程的配置,保持长连接

    //SocketChannel的封装
    ChannelHandlerContext

    • 总结
      Netty就是一个同时支持多协议的网络通信框架
      官网:
    Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
    Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。
    

    协议编解码
    2. Netty支持的协议
    官网图

    Netty的高性能

    比传统IO性能提升了8倍多。

    • 传统RPC调用性能差的三宗罪
    1. 阻塞IO不具备弹性伸缩能力(一个请求,一个应答),高并发导致宕机
    2. Java序列化编码、解码的性能问题,直接是字节流的读写
    3. 传统IO线程模型过多占用CPU资源
    • 高性能的三个主题:
      IO模型 传输
      数据协议 协议 (协议越简单,通信效率越高)
      线程模型 线程 (编解码的处理)
    • 异步非阻塞通信
      NioEventLoop聚合了多路复用器Selector,可以同时并发处理成百上千个客户端Channel,由于读写操作都是非阻塞的,可以充分提升IO线程的运行效率,避免由于频繁阻塞IO阻塞导致的线程挂起。
    • 零拷贝(直接缓冲区)
    1. 接收和发送ByteBuffer使用堆外内存直接内存进行socket读写。
    • AbstractNioByteChannel$NioByteUnsafe extends AbstractNioUnsafe
            public final void read() {
                final ChannelConfig config = config();
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
    
    

    public abstract class DefaultMaxMessagesRecvByteBufAllocator

            public ByteBuf allocate(ByteBufAllocator alloc) {
                return alloc.ioBuffer(guess());
            }
    
    • public abstract class ByteBuf implements ReferenceCounted, Comparable
    1. 提供了组合Buffer对象,可以聚合多个ByteBuffer对象。
    public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf>{
        private static final ByteBuffer EMPTY_NIO_BUFFER = Unpooled.EMPTY_BUFFER.nioBuffer();
        private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();
    
        private final ByteBufAllocator alloc;
        private final boolean direct;
        private final int maxNumComponents;
    }
    
    

    添加 ByteBuf,不需要做内存拷贝,相关代码如下:

        private CompositeByteBuf addComponents0(boolean increaseWriterIndex,
                final int cIndex, ByteBuf[] buffers, int arrOffset) {
            final int len = buffers.length, count = len - arrOffset;
    
            int readableBytes = 0;
            int capacity = capacity();
            for (int i = 0; i < buffers.length; i++) {
                readableBytes += buffers[i].readableBytes();
    
                // Check if we would overflow.
                // See https://github.com/netty/netty/issues/10194
                checkForOverflow(capacity, readableBytes);
            }
            // only set ci after we've shifted so that finally block logic is always correct
            int ci = Integer.MAX_VALUE;
            try {
                checkComponentIndex(cIndex);
                shiftComps(cIndex, count); // will increase componentCount
                int nextOffset = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
                for (ci = cIndex; arrOffset < len; arrOffset++, ci++) {
                    ByteBuf b = buffers[arrOffset];
                    if (b == null) {
                        break;
                    }
                    Component c = newComponent(ensureAccessible(b), nextOffset);
                    components[ci] = c;
                    nextOffset = c.endOffset;
                }
                return this;
            } finally {
                // ci is now the index following the last successfully added component
                if (ci < componentCount) {
                    if (ci < cIndex + count) {
                        // we bailed early
                        removeCompRange(ci, cIndex + count);
                        for (; arrOffset < len; ++arrOffset) {
                            ReferenceCountUtil.safeRelease(buffers[arrOffset]);
                        }
                    }
                    updateComponentOffsets(ci); // only need to do this here for components after the added ones
                }
                if (increaseWriterIndex && ci > cIndex && ci <= componentCount) {
                    writerIndex += components[ci - 1].endOffset - components[cIndex].offset;
                }
            }
        }
    
    
    1. transfreTo()直接将文件缓冲区的数据发送到目标Channel,避免循环导致内存拷贝的问题。
    public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {
        public long transferTo(WritableByteChannel target, long position) throws IOException {
            long count = this.count - position;
            if (count < 0 || position < 0) {
                throw new IllegalArgumentException(
                        "position out of range: " + position +
                        " (expected: 0 - " + (this.count - 1) + ')');
            }
            if (count == 0) {
                return 0L;
            }
            if (refCnt() == 0) {
                throw new IllegalReferenceCountException(0);
            }
            // Call open to make sure fc is initialized. This is a no-oop if we called it before.
            open();
    
            long written = file.transferTo(this.position + position, count, target);
            if (written > 0) {
                transferred += written;
            } else if (written == 0) {
                // If the amount of written data is 0 we need to check if the requested count is bigger then the
                // actual file itself as it may have been truncated on disk.
                //
                // See https://github.com/netty/netty/issues/8868
                validate(this, position);
            }
            return written;
        }
    }
    

    Netty 文件传输 DefaultFileRegion 通过 transferTo()方法将文件发送到目标 Channel 中,下面重点看 FileChannel 的 transferTo()方法,它的 API DOC 说明如下:

    public abstract class FileChannel{
        public abstract long transferTo(long var1, long var3, WritableByteChannel var5) throws IOException;
    }
    

    对于很多操作系统它直接将文件缓冲区的内容发送到目标 Channel 中,而不需要通过拷贝的方式,这是一种更加高效 的传输方式,它实现了文件传输的“零拷贝”。

    • 内存池

    三个维度

    1. Pooled与unPooled(池化与非池化)
      性能测试经验表明,采用内存池的 ByteBuf 相比于朝生夕灭的 ByteBuf,性能高 23 倍左右(性能数据与使用场景强相 关)。
      UnPooled的内存分配
    public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
        public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
            if (initialCapacity == 0 && maxCapacity == 0) {
                return emptyBuf;
            }
            validate(initialCapacity, maxCapacity);
            return newDirectBuffer(initialCapacity, maxCapacity);
        }
    }
    
    • newDirectBuffer
    public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            PoolThreadCache cache = threadCache.get();
            PoolArena<ByteBuffer> directArena = cache.directArena;
    
            final ByteBuf buf;
            if (directArena != null) {
                buf = directArena.allocate(cache, initialCapacity, maxCapacity);
            } else {
                buf = PlatformDependent.hasUnsafe() ?
                        UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
    
            return toLeakAwareBuffer(buf);
        }
    }
    
    • PoolArena的allocate
    abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
        PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
            allocate(cache, buf, reqCapacity);
            return buf;
        }
    }
    
    abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
            protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
                if (HAS_UNSAFE) {
                    return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
                } else {
                    return PooledDirectByteBuf.newInstance(maxCapacity);
                }
            }
    }
    
    final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
    
        
        static PooledDirectByteBuf newInstance(int maxCapacity) {
            PooledDirectByteBuf buf = RECYCLER.get();
            buf.reuse(maxCapacity);
            return buf;
        }
    }
    

    通过 RECYCLER 的 get 方法循环使用 ByteBuf 对象,如果是非内存池实现,则直接创建一个新的 ByteBuf 对象。从缓 冲池中获取 ByteBuf 之后,调用 AbstractReferenceCountedByteBuf 的 setRefCnt 方法设置引用计数器,用于对象的引 用计数和内存回收(类似 JVM 垃圾回收机制)。
    2. UnSage和非UnSafe(底层读写与应用程序读写)

    1. Heap与Direct(堆内存与堆外内存)
      netty有pooled buffer 和 unpooled buffer
      pooled 先初始化一定大小的空间 ,读写效率 ,内存分配 堆外内存读写高,但是内存管理比较麻烦。
    • 高效的Reactor线程模型
    1. Reactor单线程模型
      netty的接收连接,分发调度、读写都由一个线程完成。
    2. Reactor多线程模型
      反应接待线程、反应线程池(处理读写,编解码)
    3. 主从Reactor多线程模型
      接收线程 主线程池(调度不同模块业务的资源) 派发不同的业务逻辑线程
      从线程
    • 无锁化的串行设计理念
      Pipeline(管道) pipeline的添加是有顺序的。
      pipeline的工作模式:责任链模式,双向链表 两种类型的handle
      Inbound OutBound 负责数据处理的触发和回调 什么时候触发,什么时候回调
      pipeline可以巧妙的解决权限问题,有一个好用的东西,事件传播,异常和消息可以一层层往上传。

    MessageToMessageEncoder extends ChannelOutboundHandlerAdapter 是一个OutBound,OutBound说明数据需要一个回调处理了。

    • 高效的并发编程 Netty的高效并发编程主要体现在如下几点:
    1. volatile的大量、正确使用
    2. CAS和原子类的广泛使用
    3. 线程安全容器的使用
    4. 通过读写锁提升并发性能
    • 高性能的序列化框架
      影响序列化性能的关键因素总结如下:
    1. 序列化后的码流大小(影响网络带宽的占用)
    2. 序列化&反序列化的性能(CPU资源占用)
    3. 是否支持跨语言(异构系统的对接和开发语言切换)
    • 灵活的TCP参数配置能力

    Netty核心之服务端Channel源码分析

    服务端Channel的创建
    bind()[用户代码入口]
    initAndRegister()[初始化并注册]
    newChannel()[创建服务端channel]
    init()[初始化服务端Channel]

    配置都会保存在 channel.config

    • ServerBootStrap
        @Override
        void init(Channel channel) {
            setChannelOptions(channel, newOptionsArray(), logger);
            setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
            }
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    
    

    NioServerSocketChannel
    public class NioServerSocketChannel extends AbstractNioMessageChannel
    implements io.netty.channel.socket.ServerSocketChannel

    pipeline 初始化
    反射创建

    客户端BootStrap

    Channel简介

    在 Netty 中,Channel 是一个 Socket 的抽象,它为用户提供了关于 Socket 状态(是否是连接还是断开)以及对 Socket 的读写等操作。每当 Netty 建立了一个连接后, 都创建一个对应的 Channel 实例。 除了 TCP 协议以外,Netty 还支持很多其他的连接协议, 并且每种协议还有 NIO(非阻塞 IO)和 OIO(Old-IO, 即传统的 阻塞 IO)版本的区别。不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应

    NioSocketChannel的创建

    Bootstrap 是 Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化。
    Netty 客户端初始化时所需的所有内容:

    1、EventLoopGroup:不论是服务器端还是客户端, 都必须指定 EventLoopGroup。在这个例子中, 指定了 NioEventLoopGroup, 表示一个 NIO 的 EventLoopGroup。
    2、ChannelType: 指定 Channel 的类型。 因为是客户端,因此使用了 NioSocketChannel。
    3、Handler: 设置处理数据的 Handler。
    客户端通过 Bootstrap 启动后,都做了哪些工作?
    NioSocketChannel 的 类层次结构
    客户端连接代码的初始化 Bootstrap 中调用了一个 channel()方法,传入的参数是 NioSocketChannel.class,
    在这个方法中其实就是初始化了一个 ReflectiveChannelFactory 的对象:
    而 ReflectiveChannelFactory 实现了 ChannelFactory 接口, 它提供了唯一的方法, 即 newChannel()方法,
    ChannelFactory, 顾名思义, 就是创建 Channel 的工厂类。进入到 ReflectiveChannelFactory 的 newChannel()方法中, 我们看到其实现代码如下:

    public T newChannel() { // 删除了 try...catch 块 return clazz.newInstance(); }
    
    

    1、Bootstrap 中的 ChannelFactory 实现类是 ReflectiveChannelFactory。
    2、通过 channel()方法创建的 Channel 具体类型是 NioSocketChannel。

    Channel 的实例化过程其实就是调用 ChannelFactory 的 newChannel()方法,而实例化的 Channel 具体类型又是和初 始化 Bootstrap 时传入的 channel()方法的参数相关。因此对于客户端的 Bootstrap 而言,创建的 Channel 实例就是 NioSocketChannel。

    客户端 Channel 的初始化

    至此, NioSocketChannel 就初始化就完成了, 我们可以稍微总结一下 NioSocketChannel 初始化所做的工作内容:
    1、调用 NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个新的 Java NIOSocketChannel。
    2、AbstractChannel(Channel parent)中需要初始化的属性:

    id:每个 Channel 都拥有一个唯一的 id。
    parent:属性置为 null。
    unsafe:通过 newUnsafe()实例化一个 unsafe 对象,它的类型是 AbstractNioByteChannel.NioByteUnsafe 内部类。
    pipeline:是 new DefaultChannelPipeline(this)新创建的实例。

    3、AbstractNioChannel 中的属性:
    ch:赋值为 Java SocketChannel,即 NioSocketChannel 的 newSocket()方法返回的 Java NIO SocketChannel。
    readInterestOp:赋值为 SelectionKey.OP_READ
    ch:被配置为非阻塞,即调用 ch.configureBlocking(false)。

    4、NioSocketChannel 中的属性:
    config = new NioSocketChannelConfig(this, socket.socket())

    Unsafe 字段的初始化

    在实例化 NioSocketChannel 的过程中,会在父类 AbstractChannel 的构造方法中调用 newUnsafe()来获取一个 unsafe 实例。那么 unsafe 是怎么初始化的呢? 它的作用是什么?

    其实 unsafe 特别关键,它封装了对 Java 底层 Socket 的操作,因此实际上是沟通 Netty 上层和 Java 底层的重要的 桥梁。

        interface Unsafe {
    
            /**
             * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
             * receiving data.
             */
            RecvByteBufAllocator.Handle recvBufAllocHandle();
    
            /**
             * Return the {@link SocketAddress} to which is bound local or
             * {@code null} if none.
             */
            SocketAddress localAddress();
    
            /**
             * Return the {@link SocketAddress} to which is bound remote or
             * {@code null} if none is bound yet.
             */
            SocketAddress remoteAddress();
    
            /**
             * Register the {@link Channel} of the {@link ChannelPromise} and notify
             * the {@link ChannelFuture} once the registration was complete.
             */
            void register(EventLoop eventLoop, ChannelPromise promise);
    
            /**
             * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
             * it once its done.
             */
            void bind(SocketAddress localAddress, ChannelPromise promise);
    
            /**
             * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
             * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
             * pass {@code null} to it.
             *
             * The {@link ChannelPromise} will get notified once the connect operation was complete.
             */
            void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    
            /**
             * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
             * operation was complete.
             */
            void disconnect(ChannelPromise promise);
    
            /**
             * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
             * operation was complete.
             */
            void close(ChannelPromise promise);
    
            /**
             * Closes the {@link Channel} immediately without firing any events.  Probably only useful
             * when registration attempt failed.
             */
            void closeForcibly();
    
            /**
             * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
             * {@link ChannelPromise} once the operation was complete.
             */
            void deregister(ChannelPromise promise);
    
            /**
             * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
             * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
             */
            void beginRead();
    
            /**
             * Schedules a write operation.
             */
            void write(Object msg, ChannelPromise promise);
    
            /**
             * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
             */
            void flush();
    
            /**
             * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
             * It will never be notified of a success or error and so is only a placeholder for operations
             * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
             */
            ChannelPromise voidPromise();
    
            /**
             * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
             */
            ChannelOutboundBuffer outboundBuffer();
        }
    }
    
    

    这些方法其实都是对应到相关的 Java 底层的 Socket 的操作。
    AbstractChannel 的构造方法中,在这里调用了 newUnsafe()获取一个新的 unsafe 对象,而 newUnsafe() 方法在 NioSocketChannel 中被重写了。来看代码:

    • NioSocketChannel
        protected AbstractNioUnsafe newUnsafe() {
            return new NioSocketChannelUnsafe();
        }
    

    Pipeline 的初始化

    上面我们分析了 NioSocketChannel 的大体初始化过程, 但是我们漏掉了一个关键的部分,即 ChannelPipeline 的 初始化。在 Pipeline 的注释说明中写到“Each channel has its own pipeline and it is created automatically when a new channel is created.”,我们知道,在实例化一个 Channel 时,必然都要实例化一个 ChannelPipeline。而我们确实在 AbstractChannel 的构造器看到了 pipeline 字段被初始化为 DefaultChannelPipeline 的实例。那么我们就来看一下, DefaultChannelPipeline 构造器做了哪些工作。
    *

    public class DefaultChannelPipeline implements ChannelPipeline {
        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    }
    
    

    DefaultChannelPipeline 的构造器需要传入一个 channel,而这个 channel 其实就是我们实例化的 NioSocketChannel, DefaultChannelPipeline 会将这个 NioSocketChannel 对象保存在 channel 字段中。DefaultChannelPipeline 中还有两 个特殊的字段,即 head 和 tail,这两个字段是双向链表的头和尾。其实在 DefaultChannelPipeline 中,维护了一个以 AbstractChannelHandlerContext 为节点元素的双向链表,这个链表是 Netty 实现 Pipeline 机制的关键。

    HeadContext 的类继承层次结构:

    TailContext 的继承层次结构

    链表中 head 是一个 ChannelOutboundHandler,而 tail 则是一个 ChannelInboundHandler。
    HeadContext 的构造器:

            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, HeadContext.class);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }
    

    它调用了父类 AbstractChannelHandlerContext 的构造器,并传入参数 inbound = false,outbound = true。而 TailContext 的构造器与 HeadContext 的相反,它调用了父类 AbstractChannelHandlerContext 的构造器,并传入参数 inbound = true,outbound = false。即 header 是一个 OutBoundHandler,而 tail 是一个 InBoundHandler,关于这 一特征,大家要特别注意,先记住,后续我们分析到 Netty 的 Pipeline 时,我们会反复用到 inbound 和 outbound 这 两个属性。

    EventLoop 的初始化

    我们在一开始就实例化了一个 NioEventLoopGroup 对象,因此我们就从它的 构造器中追踪一下 EventLoop 的初始化过程。

    NioEventLoop 有几个重载的构造器,不过内容都没有太大的区别,最终都是调用的父类 MultithreadEventLoopGroup 的构造器:

        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    

    如果我们传入的线程数 nThreads 是 0,那么 Netty 会为我们设置默认的线程数,DEFAULT_EVENT_LOOP_THREADS

    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    

    Netty 首先会从系统属性中获取"io.netty.eventLoopThreads"的值,如果我们没有设置的话,那么就返回默认值:即处理 器核心数 * 2。回到 MultithreadEventLoopGroup 构造器中会继续调用父类 MultithreadEventExecutorGroup 的构造器:

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    
    

    newChooser 方法里面看看其实现逻辑,

    public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    
        public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    
        private DefaultEventExecutorChooserFactory() { }
    
        @SuppressWarnings("unchecked")
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    
        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }
    
        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        }
    
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            GenericEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];
            }
        }
    }
    

    上面的代码逻辑主要表达的意思是:即如果 nThreads 是 2 的幂,则使用 PowerOfTwoEventExecutorChooser,否则 使用 GenericEventExecutorChooser。这两个 Chooser 都重写 next()方法。next()方法的主要功能就是讲数组索引循环位移.
    这个运算逻辑其实很简单,就是每次让索引自增后和数组长度取模:idx.getAndIncrement() % executors.length。但 是就连一个非常简单的数组索引运算,Netty 都帮我们做了优化。因为在计算机底层,&与比%运算效率更高。
    好了,分析到这里我们应该已经非常清楚 MultithreadEventExecutorGroup 中的处理逻辑,简单做一个总结:
    1、创建一个大小为 nThreads 的 SingleThreadEventExecutor 数组。
    2、根据 nThreads 的大小,创建不同的 Chooser,即如果 nThreads 是 2 的幂,则使用 PowerOfTwoEventExecutorChooser,反之使用 GenericEventExecutorChooser。不论使用哪个 Chooser,它们的功 能都是一样的,即从 children 数组中选出一个合适的 EventExecutor 实例。
    3、调用 newChhild()方法初始化 children 数组。

    根据上面的代码,我们也能知道:MultithreadEventExecutorGroup 内部维护了一个 EventExecutor 数组,而 Netty 的 EventLoopGroup的实现机制其实就建立在MultithreadEventExecutorGroup之上。每当Netty需要一个EventLoop 时, 会调用 next()方法获取一个可用的 EventLoop。
    上面代码的最后一部分是 newChild()方法,这个是一个抽象方法,它的任务是实例化 EventLoop 对象。我们跟踪一下 它的代码。可以发现。这个方法在 NioEventLoopGroup 类中有实现,其内容很简单:

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
        }
    }
    

    其实逻辑很简单就是实例化一个 NioEventLoop 对象, 然后返回 NioEventLoop 对象。

    最后总结一下整个 EventLoopGroup 的初始化过程:
    1、EventLoopGroup(其实是 MultithreadEventExecutorGroup)内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads,这样就构成了一个线程池。
    2、如果我们在实例化 NioEventLoopGroup 时,如果指定线程池大小,则 nThreads 就是指定的值,反之是处理 器核心数 * 2。
    3、MultithreadEventExecutorGroup 中会调用 newChild()抽象方法来初始化 children 数组。
    4、抽象方法 newChild()是在 NioEventLoopGroup 中实现的,它返回一个 NioEventLoop 实例。
    5、NioEventLoop 属性赋值:
    provider:在 NioEventLoopGroup 构造器中通过 SelectorProvider.provider()获取一个 SelectorProvider。
    selector:在 NioEventLoop 构造器中通过调用通过 provider.openSelector()方法获取一个 selector 对象。

    Channel 注册到 Selector

    在前面的分析中,我们提到 Channel 会在 Bootstrap 的 initAndRegister()中进行初始化,但是这个方法还会将初始 化好的 Channe 注册到 NioEventLoop 的 selector 中。接下来我们来分析一下 Channel 注册的过程。
    AbstractBootstrap 的 initAndRegister()方法:

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            return regFuture;
        }
    

    当 Channel 初始化后,紧接着会调用 group().register()方法来向 selector 注册 Channel。我们继续跟踪的话,会 发现其调用链如下:

    通过跟踪调用链, 最终我们发现是调用到了 unsafe 的 register 方法,那么接下来我们就仔细看一下
    AbstractChannel$AbstractUnsafe.register()方法中到底做了什么?

    AbstractUnsafe

            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    
    
                AbstractChannel.this.eventLoop = eventLoop;
                register0(promise);
            }
    

    首先,将 eventLoop 赋值给 Channel 的 eventLoop 属性,而我们知道这个 eventLoop 对象其实是 MultithreadEventLoopGroup 的 next()方法获取的,根据我们前面的分析,我们可以确定 next()方法返回的 eventLoop 对象是 NioEventLoop 实例。register()方法接着调用了 register0()方法:

            private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    

    register0()方法又调用了 AbstractNioChannel 的 doRegister()方法:

        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    

    看到 javaChannel()这个方法在前面我们已经知道了,它返回的是一个 Java NIO 的 SocketChannel 对象,这里我 们将这个 SocketChannel 注册到与 eventLoop 关联的 selector 上了。

    我们总结一下 Channel 的注册过程:

    1、首先在 AbstractBootstrap 的 initAndRegister()方法中, 通过 group().register(channel),调用 MultithreadEventLoopGroup 的 register()方法。

    2、在 MultithreadEventLoopGroup 的 register()中,调用 next()方法获取一个可用的 SingleThreadEventLoop, 然 后调用它的 register()方法。

    3、在 SingleThreadEventLoop 的 register()方法中,调用 channel.unsafe().register(this, promise)方法来获取channel 的 unsafe()底层操作对象,然后调用 unsafe 的 register()。

    4、在 AbstractUnsafe 的 register()方法中, 调用 register0()方法注册 Channel 对象。

    5、在 AbstractUnsafe 的 register0()方法中,调用 AbstractNioChannel 的 doRegister()方法。

    6、AbstractNioChannel 的 doRegister()方法通过 javaChannel().register(eventLoop().selector, 0, this)将 Channel 对应的 Java NIO 的 SocketChannel 注册到一个 eventLoop 的 selector 中,并且将当前 Channel 作为 attachment 与 SocketChannel 关联。

    总的来说,Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联,因此这也体现了,在 Netty 中,每个 Channel 都会关联一个特定的 EventLoop,并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执 行的;当关联好 Channel 和 EventLoop 后,会继续调用底层 Java NIO 的 SocketChannel 对象的 register()方法,将底 层 Java NIO 的 SocketChannel 注册到指定的 selector 中。通过这两步,就完成了 Netty 对 Channel 的注册过程。

    NIO的弊端

    同步单线程

  • 相关阅读:
    第四周作业
    第四周上机作业
    java第十周上机练习
    java第九周上机练习
    第八周作业
    java第八周上机练习
    java第七周作业
    java第七周上机练习
    java第六周作业
    java上机练习 4.9
  • 原文地址:https://www.cnblogs.com/nangonghui/p/13290323.html
Copyright © 2011-2022 走看看