zoukankan      html  css  js  c++  java
  • NETTY篇 一篇文章看懂NETTY

    NIO

     近期接触了几个产品都触及NIO,要么应用,要么改造项目,听多了也有些了解,但仍然不能真正理解,工期比较赶,还是要潜心下来看看。    

     NIO是什么呢,NOT-BLOCKING IO,不阻塞的IO,字面上理解就是不用排队的IO。原生的BIO存在多个地方的阻塞,如服务端开启的accept(),read(),write(),NIO解决了这些问题吗,其实并不然。经过学习会发现,NIO在阻塞方面只是搞定了IO操作中的部分阻塞问题,将原本必须完全阻塞的读写方法改变为半阻塞方法,开启轮询模式,让复用路由器循环探测读写操作是否完成,如果未完成,线程就要去干别的事情了。

     虽然NIO叫同步非阻塞IO,但是他主要解决的应该是多线程开销问题,更高程度上利用系统资源,至于阻塞问题,只是缓解而已。

    NIO之于BIO

     其实我都想直接记录NETTY了,因为大家都想用最先进的东西。但是这中间的实现原理还是挺有趣的,也能方便以后理解NETTY的源码,还是有必要一步一步记录的。

     先上个图,表示一下,概念上明白一下NIO和BIO的不同

      

      可以看到,常规的多线程模式对一个连接都建立一个线程。但是NIO对每一个连接并没有去建立新的线程。BIO对每个连接都建立一个SocketChannel,然后将该对象注册到Selector上面。至于每个连接如何完成各自接下来的工作,只要研究Selector对象就行了。

      打开Selector类,查看类介绍和方法,我蛮贴一下类介绍(慎重打开),有兴趣的真可以看看(没兴趣直接跳过,没有影响)可以很快了解到工作原理:

    /**
     * A multiplexor of {@link SelectableChannel} objects.
     *
     * <p> A selector may be created by invoking the {@link #open open} method of
     * this class, which will use the system's default {@link
     * java.nio.channels.spi.SelectorProvider selector provider} to
     * create a new selector.  A selector may also be created by invoking the
     * {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector}
     * method of a custom selector provider.  A selector remains open until it is
     * closed via its {@link #close close} method.
     *
     * <a name="ks"></a>
     *
     * <p> A selectable channel's registration with a selector is represented by a
     * {@link SelectionKey} object.  A selector maintains three sets of selection
     * keys:
     *
     * <ul>
     *
     *   <li><p> The <i>key set</i> contains the keys representing the current
     *   channel registrations of this selector.  This set is returned by the
     *   {@link #keys() keys} method. </p></li>
     *
     *   <li><p> The <i>selected-key set</i> is the set of keys such that each
     *   key's channel was detected to be ready for at least one of the operations
     *   identified in the key's interest set during a prior selection operation.
     *   This set is returned by the {@link #selectedKeys() selectedKeys} method.
     *   The selected-key set is always a subset of the key set. </p></li>
     *
     *   <li><p> The <i>cancelled-key</i> set is the set of keys that have been
     *   cancelled but whose channels have not yet been deregistered.  This set is
     *   not directly accessible.  The cancelled-key set is always a subset of the
     *   key set. </p></li>
     *
     * </ul>
     *
     * <p> All three sets are empty in a newly-created selector.
     *
     * <p> A key is added to a selector's key set as a side effect of registering a
     * channel via the channel's {@link SelectableChannel#register(Selector,int)
     * register} method.  Cancelled keys are removed from the key set during
     * selection operations.  The key set itself is not directly modifiable.
     *
     * <p> A key is added to its selector's cancelled-key set when it is cancelled,
     * whether by closing its channel or by invoking its {@link SelectionKey#cancel
     * cancel} method.  Cancelling a key will cause its channel to be deregistered
     * during the next selection operation, at which time the key will removed from
     * all of the selector's key sets.
     *
     * <a name="sks"></a><p> Keys are added to the selected-key set by selection
     * operations.  A key may be removed directly from the selected-key set by
     * invoking the set's {@link java.util.Set#remove(java.lang.Object) remove}
     * method or by invoking the {@link java.util.Iterator#remove() remove} method
     * of an {@link java.util.Iterator iterator} obtained from the
     * set.  Keys are never removed from the selected-key set in any other way;
     * they are not, in particular, removed as a side effect of selection
     * operations.  Keys may not be added directly to the selected-key set. </p>
     *
     *
     * <a name="selop"></a>
     * <h2>Selection</h2>
     *
     * <p> During each selection operation, keys may be added to and removed from a
     * selector's selected-key set and may be removed from its key and
     * cancelled-key sets.  Selection is performed by the {@link #select()}, {@link
     * #select(long)}, and {@link #selectNow()} methods, and involves three steps:
     * </p>
     *
     * <ol>
     *
     *   <li><p> Each key in the cancelled-key set is removed from each key set of
     *   which it is a member, and its channel is deregistered.  This step leaves
     *   the cancelled-key set empty. </p></li>
     *
     *   <li><p> The underlying operating system is queried for an update as to the
     *   readiness of each remaining channel to perform any of the operations
     *   identified by its key's interest set as of the moment that the selection
     *   operation began.  For a channel that is ready for at least one such
     *   operation, one of the following two actions is performed: </p>
     *
     *   <ol>
     *
     *     <li><p> If the channel's key is not already in the selected-key set then
     *     it is added to that set and its ready-operation set is modified to
     *     identify exactly those operations for which the channel is now reported
     *     to be ready.  Any readiness information previously recorded in the ready
     *     set is discarded.  </p></li>
     *
     *     <li><p> Otherwise the channel's key is already in the selected-key set,
     *     so its ready-operation set is modified to identify any new operations
     *     for which the channel is reported to be ready.  Any readiness
     *     information previously recorded in the ready set is preserved; in other
     *     words, the ready set returned by the underlying system is
     *     bitwise-disjoined into the key's current ready set. </p></li>
     *
     *   </ol>
     *
     *   If all of the keys in the key set at the start of this step have empty
     *   interest sets then neither the selected-key set nor any of the keys'
     *   ready-operation sets will be updated.
     *
     *   <li><p> If any keys were added to the cancelled-key set while step (2) was
     *   in progress then they are processed as in step (1). </p></li>
     *
     * </ol>
     *
     * <p> Whether or not a selection operation blocks to wait for one or more
     * channels to become ready, and if so for how long, is the only essential
     * difference between the three selection methods. </p>
     *
     *
     * <h2>Concurrency</h2>
     *
     * <p> Selectors are themselves safe for use by multiple concurrent threads;
     * their key sets, however, are not.
     *
     * <p> The selection operations synchronize on the selector itself, on the key
     * set, and on the selected-key set, in that order.  They also synchronize on
     * the cancelled-key set during steps (1) and (3) above.
     *
     * <p> Changes made to the interest sets of a selector's keys while a
     * selection operation is in progress have no effect upon that operation; they
     * will be seen by the next selection operation.
     *
     * <p> Keys may be cancelled and channels may be closed at any time.  Hence the
     * presence of a key in one or more of a selector's key sets does not imply
     * that the key is valid or that its channel is open.  Application code should
     * be careful to synchronize and check these conditions as necessary if there
     * is any possibility that another thread will cancel a key or close a channel.
     *
     * <p> A thread blocked in one of the {@link #select()} or {@link
     * #select(long)} methods may be interrupted by some other thread in one of
     * three ways:
     *
     * <ul>
     *
     *   <li><p> By invoking the selector's {@link #wakeup wakeup} method,
     *   </p></li>
     *
     *   <li><p> By invoking the selector's {@link #close close} method, or
     *   </p></li>
     *
     *   <li><p> By invoking the blocked thread's {@link
     *   java.lang.Thread#interrupt() interrupt} method, in which case its
     *   interrupt status will be set and the selector's {@link #wakeup wakeup}
     *   method will be invoked. </p></li>
     *
     * </ul>
     *
     * <p> The {@link #close close} method synchronizes on the selector and all
     * three key sets in the same order as in a selection operation.
     *
     * <a name="ksc"></a>
     *
     * <p> A selector's key and selected-key sets are not, in general, safe for use
     * by multiple concurrent threads.  If such a thread might modify one of these
     * sets directly then access should be controlled by synchronizing on the set
     * itself.  The iterators returned by these sets' {@link
     * java.util.Set#iterator() iterator} methods are <i>fail-fast:</i> If the set
     * is modified after the iterator is created, in any way except by invoking the
     * iterator's own {@link java.util.Iterator#remove() remove} method, then a
     * {@link java.util.ConcurrentModificationException} will be thrown. </p>
     *
     *
     * @author Mark Reinhold
     * @author JSR-51 Expert Group
     * @since 1.4
     *
     * @see SelectableChannel
     * @see SelectionKey
     */
    View Code

      大概在讲什么呢,大概要这么几点

         1.Selector用open()方法创建,当然也有其他办法,比如SelectorProvider类中的provider()方法等;

         2.Selector维护三个数据集,这个很重要。一个是keys set,记录的是注册的Channel所注册的操作,第二个是selected-keys set,记录的是已经探测确认要执行任务的操作,第三个是要取消的操作,cancel-keys set。最后一个集合没有提供访问方法。

         3.选择操作,每一个选择过程都可能伴随着key在set中的进出,有三个方法可供选择操作,select(),select(long),selectNow()。选择操作是否阻塞,以及阻塞多久时间,这是这三个方法的本质区别。

         4.并发性。Selector本身是线程安全的,但是其内在的几个控制set却未必。一个线程如果因为选择操作被阻塞,需要其他Selector唤醒或关闭。线程对于Set的操作需要加锁。

      其实写IO操作或者SOCKET代码的时候,观察线程会经常发现一个问题,很多线程在工作过程中很长时间是处于非就绪状态的。上述其实已经解释了NIO和BIO的差别,他通过不断去查看这些Set可以知道哪些是需要执行的操作,立马执行而不再浪费空间去阻塞。实现上是通过Selector类的selectKeys()获得就绪态的key,然后通过这些key找到对应的channle,执行相应的请求。

      这就是书里说的,BIO是一个连接一个线程,NIO是一个请求一个线程(没有请求,是不会让线程工作的)。

      看到这里,可以明白为什么说BIO能够解决线程扩展的开销问题,他使用线程更加节省,只有在发生请求的时候才工作,而且也降低了线程切换带来的开销,接着要搞明白他为什么称为非阻塞的。这要涉及到channel的概念,继续打开Channel类(慎重打开)

    /**
     * A nexus for I/O operations.
     *
     * <p> A channel represents an open connection to an entity such as a hardware
     * device, a file, a network socket, or a program component that is capable of
     * performing one or more distinct I/O operations, for example reading or
     * writing.
     *
     * <p> A channel is either open or closed.  A channel is open upon creation,
     * and once closed it remains closed.  Once a channel is closed, any attempt to
     * invoke an I/O operation upon it will cause a {@link ClosedChannelException}
     * to be thrown.  Whether or not a channel is open may be tested by invoking
     * its {@link #isOpen isOpen} method.
     *
     * <p> Channels are, in general, intended to be safe for multithreaded access
     * as described in the specifications of the interfaces and classes that extend
     * and implement this interface.
     *
     *
     * @author Mark Reinhold
     * @author JSR-51 Expert Group
     * @since 1.4
     */
    View Code

      大概这么几点:

         1.一个channel代表一个通道,可以是硬件设备,文件,网络,程序元素(FileChannel,DatagramChannel,SocketChannel,ServerSocketChannel)各种通道,而且可以执行多种操作,读和写(这与Stream的操作不相同,String只能执行一项操作,毕竟单向流)

         2.其他特点其实在其他的实现类中有,比如FileChannle中写了channle对于传统IO操作所对应的对Buffer的操作。对于Buffer的研究不是本次记录重点,但是这个概念也比较重要,所以我贴了一位同学的研究成果作为记录: https://blog.csdn.net/dkfajsldfsdfsd/article/details/89225098

    BUFFER

      Buffer在NIO中是另一个极其重要的概念,非常关键。在NIO模式下,用户主要与channel与buffer打交道,相对来说与buffer打交道的机会更多一些。Channel相当于是一个双向通道,将后端的数据源如文件、socket等与buffer连接起来。一旦通道建立完成,其它时间则主要与buffer打交道,此时可以把buffer看成是数据源的代理或者是用户与数据源之间的中间商,操作buffer就相当于间接操作数据源,并且操作是非阻塞的。

      Buffer首先是内存中的一块存储空间,当然它不可能只是一块单纯的内存,为了协调数据源与用户,NIO中的buffer封装了一些特有的成员与方法。

    Basic Buffer Usage

      使用buffer读写数据,无论是读还是写都涉及四个具体的小步骤,下边分别详细说明。

    读操作

      向buffer写入数据。这一步通过调用channel的read()方法实现,例如本文第二节中出现的代码:bytesRead = inChannel.read(buf);,要求channel从它后端的数据源中read出数据并写入buffer,注意channel的非阻塞特性,此时buffer处于写模式。
      调用buffer的flip()方法,实现写模式到读模式的转换,表示用户接下来将从buffer中读数据。
      用户从buffer中将数据读出,例如调用buffer的get()方法等。
      调用buffer的clear()方法,彻底清空buffer中的数据,既使buffer中存在着用户尚未读出来的数据。或者compact()方法,只清空已经读出来的数据,未读出的数据则继续保留并移动到buffer的开始处。假如我们还需要继续从文件读数据,则继续调用channel的read()方法,并循环以上过程,只到在某个条件满足退出循环。

    写操作

      用户向buffer中写入或者追加数据,此时buffer处于写模式。
      调用buffer的flip()方法,实现写模式到读模式的转换,表示接下来会要求channel将buffer中数据读出并写入到后端的数据源。
      从buffer中读出数据。这一步通过调用channel的write()方法实现,这要求channel从buffer中读出数据并写入到后端的数据源,注意channel的非阻塞特性。
      用户判断buffer的状态,确认写入的进度,如buffer中的数据有多少已经被channel写入到后端,有多少尚未写入。用户可持续这四个步骤,直到写入全部数据。
      读操作的简单代码可参考第二节。

    原理

      要理解buffer如何工作,需要熟悉它的三个属性。

      capacity
      position
      limit
      Capacity表示buffer的容量,容量在创建buffer时指定,它是一成不变的。通过前边的介绍可知,buffer有模式,通过flip()方法可在读、写模式之间切换。模式不同,则position与limit的含义也不同。先看一张图,稍后解释它。

                                          Java NIO: Buffer capacity, position and limit in write and read mode.

      看一下上图中右边的图,它表示写模式下的buffer。此时,postion代表下一个可以写入的位置,最开始时这个值是0,随着数据的增加position会持续变大。而limit此时与capacity相同,表示position的最大值限制,postion不能超过limit的限制。

      将buffer由写模式切换到读模式后,postion与limit的含义及值均发生变化。postion指向buffer开始的位置,也就是0。而limit则指向写模式下的postion的位置。随着读操作的进行,postion的值增加,但它一定小于limit的值,因为limit及其以后的空间是无效的,还没有写入数据。

      假如再将buffer由读模式切换到写模式,有三种可能。

        如果在读模式下buffer中的数据即没有通过clear()方法彻底清空,也没有通过compact()方法将已经读取的数据清空,则buffer切换回写模式后什么都不会变,相当于没有切换。
        如果在读模式下buffer中的数据通过clear()方法彻底清空,则切换回写模式后,postion将指向0的位置,相当于这是一个空的buffer。
        如果在读模式下buffer中的数据通过调用compact()方法,只是将已经读取过的数据清空。则切换回写模式后,buffer中剩余的未读数据将会向buffer的顶端移动,同时postion的位置也会向顶端方向移动,也就是未读数据仍然保留。

      遗憾的是BIO探索就到此结束了,因为我在实现类中发现了异步的实现类,大概就是传闻的AIO。

    AIO之于BIO

      打开该类的实现类,发现有两个类型的channel实现了异步,分别是AsynchronousChannel接口和AysnchronousFileChannel类,再看AsynchronousChannel又会发现有几个类实现了该接口

      顺便看一下AysnchronousFileChannel介绍:

    /**
     * An asynchronous channel for reading, writing, and manipulating a file.
     *
     * <p> An asynchronous file channel is created when a file is opened by invoking
     * one of the {@link #open open} methods defined by this class. The file contains
     * a variable-length sequence of bytes that can be read and written and whose
     * current size can be {@link #size() queried}. The size of the file increases
     * when bytes are written beyond its  current size; the size of the file decreases
     * when it is {@link #truncate truncated}.
     *
     * <p> An asynchronous file channel does not have a <i>current position</i>
     * within the file. Instead, the file position is specified to each read and
     * write method that initiates asynchronous operations. A {@link CompletionHandler}
     * is specified as a parameter and is invoked to consume the result of the I/O
     * operation. This class also defines read and write methods that initiate
     * asynchronous operations, returning a {@link Future} to represent the pending
     * result of the operation. The {@code Future} may be used to check if the
     * operation has completed, wait for its completion, and retrieve the result.
     *
     * <p> In addition to read and write operations, this class defines the
     * following operations: </p>
     *
     * <ul>
     *
     *   <li><p> Updates made to a file may be {@link #force <i>forced
     *   out</i>} to the underlying storage device, ensuring that data are not
     *   lost in the event of a system crash.  </p></li>
     *
     *   <li><p> A region of a file may be {@link #lock <i>locked</i>} against
     *   access by other programs.  </p></li>
     *
     * </ul>
     *
     * <p> An {@code AsynchronousFileChannel} is associated with a thread pool to
     * which tasks are submitted to handle I/O events and dispatch to completion
     * handlers that consume the results of I/O operations on the channel. The
     * completion handler for an I/O operation initiated on a channel is guaranteed
     * to be invoked by one of the threads in the thread pool (This ensures that the
     * completion handler is run by a thread with the expected <em>identity</em>).
     * Where an I/O operation completes immediately, and the initiating thread is
     * itself a thread in the thread pool, then the completion handler may be invoked
     * directly by the initiating thread. When an {@code AsynchronousFileChannel} is
     * created without specifying a thread pool then the channel is associated with
     * a system-dependent default thread pool that may be shared with other
     * channels. The default thread pool is configured by the system properties
     * defined by the {@link AsynchronousChannelGroup} class.
     *
     * <p> Channels of this type are safe for use by multiple concurrent threads. The
     * {@link Channel#close close} method may be invoked at any time, as specified
     * by the {@link Channel} interface. This causes all outstanding asynchronous
     * operations on the channel to complete with the exception {@link
     * AsynchronousCloseException}. Multiple read and write operations may be
     * outstanding at the same time. When multiple read and write operations are
     * outstanding then the ordering of the I/O operations, and the order that the
     * completion handlers are invoked, is not specified; they are not, in particular,
     * guaranteed to execute in the order that the operations were initiated. The
     * {@link java.nio.ByteBuffer ByteBuffers} used when reading or writing are not
     * safe for use by multiple concurrent I/O operations. Furthermore, after an I/O
     * operation is initiated then care should be taken to ensure that the buffer is
     * not accessed until after the operation has completed.
     *
     * <p> As with {@link FileChannel}, the view of a file provided by an instance of
     * this class is guaranteed to be consistent with other views of the same file
     * provided by other instances in the same program.  The view provided by an
     * instance of this class may or may not, however, be consistent with the views
     * seen by other concurrently-running programs due to caching performed by the
     * underlying operating system and delays induced by network-filesystem protocols.
     * This is true regardless of the language in which these other programs are
     * written, and whether they are running on the same machine or on some other
     * machine.  The exact nature of any such inconsistencies are system-dependent
     * and are therefore unspecified.
     *
     * @since 1.7
     */
    View Code

      重点大概这么几点

          1.fileChannel打开一个文件后,将读写的工作交给channle对象。

          2.读写操作会返回一个future,这个future将被查询是否完成了工作,这就是异步工作的原理了。

      贴一下两个方法:

      read: public abstract Future<Integer> read(ByteBuffer dst, long position);

      /**
         * Reads a sequence of bytes from this channel into the given buffer,
         * starting at the given file position.
         *
         * <p> This method initiates the reading of a sequence of bytes from this
         * channel into the given buffer, starting at the given file position. This
         * method returns a {@code Future} representing the pending result of the
         * operation. The {@code Future}'s {@link Future#get() get} method returns
         * the number of bytes read or {@code -1} if the given position is greater
         * than or equal to the file's size at the time that the read is attempted.
         *
         * <p> This method works in the same manner as the {@link
         * AsynchronousByteChannel#read(ByteBuffer)} method, except that bytes are
         * read starting at the given file position. If the given file position is
         * greater than the file's size at the time that the read is attempted then
         * no bytes are read.
         *
         * @param   dst
         *          The buffer into which bytes are to be transferred
         * @param   position
         *          The file position at which the transfer is to begin;
         *          must be non-negative
         *
         * @return  A {@code Future} object representing the pending result
         *
         * @throws  IllegalArgumentException
         *          If the position is negative or the buffer is read-only
         * @throws  NonReadableChannelException
         *          If this channel was not opened for reading
         */
        public abstract Future<Integer> read(ByteBuffer dst, long position);
    View Code

      write:public abstract Future<Integer> wriet(ByteBuffer src, long position);

     /**
         * Writes a sequence of bytes to this channel from the given buffer, starting
         * at the given file position.
         *
         * <p> This method initiates the writing of a sequence of bytes to this
         * channel from the given buffer, starting at the given file position. The
         * method returns a {@code Future} representing the pending result of the
         * write operation. The {@code Future}'s {@link Future#get() get} method
         * returns the number of bytes written.
         *
         * <p> This method works in the same manner as the {@link
         * AsynchronousByteChannel#write(ByteBuffer)} method, except that bytes are
         * written starting at the given file position. If the given position is
         * greater than the file's size, at the time that the write is attempted,
         * then the file will be grown to accommodate the new bytes; the values of
         * any bytes between the previous end-of-file and the newly-written bytes
         * are unspecified.
         *
         * @param   src
         *          The buffer from which bytes are to be transferred
         * @param   position
         *          The file position at which the transfer is to begin;
         *          must be non-negative
         *
         * @return  A {@code Future} object representing the pending result
         *
         * @throws  IllegalArgumentException
         *          If the position is negative
         * @throws  NonWritableChannelException
         *          If this channel was not opened for writing
         */
        public abstract Future<Integer> write(ByteBuffer src, long position);
    View Code

       这两个方法都有另外一个孪生方法,可以传入一个完成时执行的操作,该方法将读写操作全部交予channel。

      AIO是真正的异步模式,线程向系统发起异步IO操作,整个操作的完成都由系统负责。线程可以等待异步IO的完成,然后再执行后续的操作,这个称为“将来式”。另一种是线程发起异步IO时,同时指明当IO异步操作完成时的后续处理,这个是“回调式”。“回调式”需要线程池的配置,当系统完成异步IO后,将线程指定的后续处理当成一个任务提交给线程池,线程池中的线程空闲时提取任务,执行其中的处理。

    IO总结

      IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高

      但是对AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。

    NETTY

      上述讲完NIO和AIO,NETTY也就一步到位算了。

      NETTY是什么,我们一般说到微服务通信,响应式编程的时候会提到它,Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。他实现的原理核心就是NIO。

      到目前为止,我对NETTY的了解其实也很有限,因为涉及到有真正应用NETTY的项目都没有上线,能够直观感受到的时候NETTY对NIO的接口封装之后,使用起来会方便得多。下面先用一张数据流图和一个例子来说明NETTY,至于更详细的学习记录,以后再出。

        

        图片来源:https://www.cnblogs.com/imstudy/p/9908791.html

        图中可以识别出几个关键点:

          1.初始化创建 2 个 NioEventLoopGroup:其中 boosGroup 用于 Accetpt 连接建立事件并分发请求,workerGroup 用于处理 I/O 读写事件和业务逻辑。

          2.NioEventLoopGroup 相当于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。

          每个 Boss NioEventLoop 循环执行的任务包含 3 步:

            1)轮询 Accept 事件;

            2)处理 Accept I/O 事件,与 Client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上;

            3)处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务。

          每个 Worker NioEventLoop 循环执行的任务包含 3 步:

            1)轮询 Read、Write 事件;

            2)处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理;

            3)处理任务队列中的任务,runAllTasks。

    例子

      最后给一个NETTY的例子,例子来源: https://www.jianshu.com/p/a4e03835921a

      NettyClient.java

    /**
     * @author 闪电侠
     */
    public class NettyClient {
        public static void main(String[] args) throws InterruptedException {
            Bootstrap bootstrap = new Bootstrap();
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    });
    
            Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
    
            while (true) {
                channel.writeAndFlush(new Date() + ": hello world!");
                Thread.sleep(2000);
            }
        }
    }
    View Code

      NettyServer.java

    /**
     * @author 闪电侠
     */
    public class NettyServer {
        public static void main(String[] args) {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            NioEventLoopGroup boos = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            serverBootstrap
                    .group(boos, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
                    .bind(8000);
        }
    }
    View Code

      未完待续...

  • 相关阅读:
    【NOI2008】志愿者招募
    【2010国家集训队】人员雇佣
    html5手机移动端三级联动城市选择器
    WebSocket实现简单的在线聊天
    游戏开发完整学习路线(各个版本都有)
    vs下开发windows服务程序
    解决Firefox下,页面元素不刷新问题
    C# JObject和JArray 的分享
    jQuery如何改变css伪元素样式
    safari 浏览器window.history.go(-1)运行无效解决办法
  • 原文地址:https://www.cnblogs.com/garfieldcgf/p/11867211.html
Copyright © 2011-2022 走看看