NIO
近期接触了几个产品都触及NIO,要么应用,要么改造项目,听多了也有些了解,但仍然不能真正理解,工期比较赶,还是要潜心下来看看。
NIO是什么呢,NOT-BLOCKING IO,不阻塞的IO,字面上理解就是不用排队的IO。原生的BIO存在多个地方的阻塞,如服务端开启的accept(),read(),write(),NIO解决了这些问题吗,其实并不然。经过学习会发现,NIO在阻塞方面只是搞定了IO操作中的部分阻塞问题,将原本必须完全阻塞的读写方法改变为半阻塞方法,开启轮询模式,让复用路由器循环探测读写操作是否完成,如果未完成,线程就要去干别的事情了。
虽然NIO叫同步非阻塞IO,但是他主要解决的应该是多线程开销问题,更高程度上利用系统资源,至于阻塞问题,只是缓解而已。
NIO之于BIO
其实我都想直接记录NETTY了,因为大家都想用最先进的东西。但是这中间的实现原理还是挺有趣的,也能方便以后理解NETTY的源码,还是有必要一步一步记录的。
先上个图,表示一下,概念上明白一下NIO和BIO的不同
可以看到,常规的多线程模式对一个连接都建立一个线程。但是NIO对每一个连接并没有去建立新的线程。BIO对每个连接都建立一个SocketChannel,然后将该对象注册到Selector上面。至于每个连接如何完成各自接下来的工作,只要研究Selector对象就行了。
打开Selector类,查看类介绍和方法,我蛮贴一下类介绍(慎重打开),有兴趣的真可以看看(没兴趣直接跳过,没有影响)可以很快了解到工作原理:
/** * A multiplexor of {@link SelectableChannel} objects. * * <p> A selector may be created by invoking the {@link #open open} method of * this class, which will use the system's default {@link * java.nio.channels.spi.SelectorProvider selector provider} to * create a new selector. A selector may also be created by invoking the * {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector} * method of a custom selector provider. A selector remains open until it is * closed via its {@link #close close} method. * * <a name="ks"></a> * * <p> A selectable channel's registration with a selector is represented by a * {@link SelectionKey} object. A selector maintains three sets of selection * keys: * * <ul> * * <li><p> The <i>key set</i> contains the keys representing the current * channel registrations of this selector. This set is returned by the * {@link #keys() keys} method. </p></li> * * <li><p> The <i>selected-key set</i> is the set of keys such that each * key's channel was detected to be ready for at least one of the operations * identified in the key's interest set during a prior selection operation. * This set is returned by the {@link #selectedKeys() selectedKeys} method. * The selected-key set is always a subset of the key set. </p></li> * * <li><p> The <i>cancelled-key</i> set is the set of keys that have been * cancelled but whose channels have not yet been deregistered. This set is * not directly accessible. The cancelled-key set is always a subset of the * key set. </p></li> * * </ul> * * <p> All three sets are empty in a newly-created selector. * * <p> A key is added to a selector's key set as a side effect of registering a * channel via the channel's {@link SelectableChannel#register(Selector,int) * register} method. Cancelled keys are removed from the key set during * selection operations. The key set itself is not directly modifiable. * * <p> A key is added to its selector's cancelled-key set when it is cancelled, * whether by closing its channel or by invoking its {@link SelectionKey#cancel * cancel} method. Cancelling a key will cause its channel to be deregistered * during the next selection operation, at which time the key will removed from * all of the selector's key sets. * * <a name="sks"></a><p> Keys are added to the selected-key set by selection * operations. A key may be removed directly from the selected-key set by * invoking the set's {@link java.util.Set#remove(java.lang.Object) remove} * method or by invoking the {@link java.util.Iterator#remove() remove} method * of an {@link java.util.Iterator iterator} obtained from the * set. Keys are never removed from the selected-key set in any other way; * they are not, in particular, removed as a side effect of selection * operations. Keys may not be added directly to the selected-key set. </p> * * * <a name="selop"></a> * <h2>Selection</h2> * * <p> During each selection operation, keys may be added to and removed from a * selector's selected-key set and may be removed from its key and * cancelled-key sets. Selection is performed by the {@link #select()}, {@link * #select(long)}, and {@link #selectNow()} methods, and involves three steps: * </p> * * <ol> * * <li><p> Each key in the cancelled-key set is removed from each key set of * which it is a member, and its channel is deregistered. This step leaves * the cancelled-key set empty. </p></li> * * <li><p> The underlying operating system is queried for an update as to the * readiness of each remaining channel to perform any of the operations * identified by its key's interest set as of the moment that the selection * operation began. For a channel that is ready for at least one such * operation, one of the following two actions is performed: </p> * * <ol> * * <li><p> If the channel's key is not already in the selected-key set then * it is added to that set and its ready-operation set is modified to * identify exactly those operations for which the channel is now reported * to be ready. Any readiness information previously recorded in the ready * set is discarded. </p></li> * * <li><p> Otherwise the channel's key is already in the selected-key set, * so its ready-operation set is modified to identify any new operations * for which the channel is reported to be ready. Any readiness * information previously recorded in the ready set is preserved; in other * words, the ready set returned by the underlying system is * bitwise-disjoined into the key's current ready set. </p></li> * * </ol> * * If all of the keys in the key set at the start of this step have empty * interest sets then neither the selected-key set nor any of the keys' * ready-operation sets will be updated. * * <li><p> If any keys were added to the cancelled-key set while step (2) was * in progress then they are processed as in step (1). </p></li> * * </ol> * * <p> Whether or not a selection operation blocks to wait for one or more * channels to become ready, and if so for how long, is the only essential * difference between the three selection methods. </p> * * * <h2>Concurrency</h2> * * <p> Selectors are themselves safe for use by multiple concurrent threads; * their key sets, however, are not. * * <p> The selection operations synchronize on the selector itself, on the key * set, and on the selected-key set, in that order. They also synchronize on * the cancelled-key set during steps (1) and (3) above. * * <p> Changes made to the interest sets of a selector's keys while a * selection operation is in progress have no effect upon that operation; they * will be seen by the next selection operation. * * <p> Keys may be cancelled and channels may be closed at any time. Hence the * presence of a key in one or more of a selector's key sets does not imply * that the key is valid or that its channel is open. Application code should * be careful to synchronize and check these conditions as necessary if there * is any possibility that another thread will cancel a key or close a channel. * * <p> A thread blocked in one of the {@link #select()} or {@link * #select(long)} methods may be interrupted by some other thread in one of * three ways: * * <ul> * * <li><p> By invoking the selector's {@link #wakeup wakeup} method, * </p></li> * * <li><p> By invoking the selector's {@link #close close} method, or * </p></li> * * <li><p> By invoking the blocked thread's {@link * java.lang.Thread#interrupt() interrupt} method, in which case its * interrupt status will be set and the selector's {@link #wakeup wakeup} * method will be invoked. </p></li> * * </ul> * * <p> The {@link #close close} method synchronizes on the selector and all * three key sets in the same order as in a selection operation. * * <a name="ksc"></a> * * <p> A selector's key and selected-key sets are not, in general, safe for use * by multiple concurrent threads. If such a thread might modify one of these * sets directly then access should be controlled by synchronizing on the set * itself. The iterators returned by these sets' {@link * java.util.Set#iterator() iterator} methods are <i>fail-fast:</i> If the set * is modified after the iterator is created, in any way except by invoking the * iterator's own {@link java.util.Iterator#remove() remove} method, then a * {@link java.util.ConcurrentModificationException} will be thrown. </p> * * * @author Mark Reinhold * @author JSR-51 Expert Group * @since 1.4 * * @see SelectableChannel * @see SelectionKey */
大概在讲什么呢,大概要这么几点
1.Selector用open()方法创建,当然也有其他办法,比如SelectorProvider类中的provider()方法等;
2.Selector维护三个数据集,这个很重要。一个是keys set,记录的是注册的Channel所注册的操作,第二个是selected-keys set,记录的是已经探测确认要执行任务的操作,第三个是要取消的操作,cancel-keys set。最后一个集合没有提供访问方法。
3.选择操作,每一个选择过程都可能伴随着key在set中的进出,有三个方法可供选择操作,select(),select(long),selectNow()。选择操作是否阻塞,以及阻塞多久时间,这是这三个方法的本质区别。
4.并发性。Selector本身是线程安全的,但是其内在的几个控制set却未必。一个线程如果因为选择操作被阻塞,需要其他Selector唤醒或关闭。线程对于Set的操作需要加锁。
其实写IO操作或者SOCKET代码的时候,观察线程会经常发现一个问题,很多线程在工作过程中很长时间是处于非就绪状态的。上述其实已经解释了NIO和BIO的差别,他通过不断去查看这些Set可以知道哪些是需要执行的操作,立马执行而不再浪费空间去阻塞。实现上是通过Selector类的selectKeys()获得就绪态的key,然后通过这些key找到对应的channle,执行相应的请求。
这就是书里说的,BIO是一个连接一个线程,NIO是一个请求一个线程(没有请求,是不会让线程工作的)。
看到这里,可以明白为什么说BIO能够解决线程扩展的开销问题,他使用线程更加节省,只有在发生请求的时候才工作,而且也降低了线程切换带来的开销,接着要搞明白他为什么称为非阻塞的。这要涉及到channel的概念,继续打开Channel类(慎重打开)
/** * A nexus for I/O operations. * * <p> A channel represents an open connection to an entity such as a hardware * device, a file, a network socket, or a program component that is capable of * performing one or more distinct I/O operations, for example reading or * writing. * * <p> A channel is either open or closed. A channel is open upon creation, * and once closed it remains closed. Once a channel is closed, any attempt to * invoke an I/O operation upon it will cause a {@link ClosedChannelException} * to be thrown. Whether or not a channel is open may be tested by invoking * its {@link #isOpen isOpen} method. * * <p> Channels are, in general, intended to be safe for multithreaded access * as described in the specifications of the interfaces and classes that extend * and implement this interface. * * * @author Mark Reinhold * @author JSR-51 Expert Group * @since 1.4 */
大概这么几点:
1.一个channel代表一个通道,可以是硬件设备,文件,网络,程序元素(FileChannel,DatagramChannel,SocketChannel,ServerSocketChannel)各种通道,而且可以执行多种操作,读和写(这与Stream的操作不相同,String只能执行一项操作,毕竟单向流)
2.其他特点其实在其他的实现类中有,比如FileChannle中写了channle对于传统IO操作所对应的对Buffer的操作。对于Buffer的研究不是本次记录重点,但是这个概念也比较重要,所以我贴了一位同学的研究成果作为记录: https://blog.csdn.net/dkfajsldfsdfsd/article/details/89225098
BUFFER
Buffer在NIO中是另一个极其重要的概念,非常关键。在NIO模式下,用户主要与channel与buffer打交道,相对来说与buffer打交道的机会更多一些。Channel相当于是一个双向通道,将后端的数据源如文件、socket等与buffer连接起来。一旦通道建立完成,其它时间则主要与buffer打交道,此时可以把buffer看成是数据源的代理或者是用户与数据源之间的中间商,操作buffer就相当于间接操作数据源,并且操作是非阻塞的。
Buffer首先是内存中的一块存储空间,当然它不可能只是一块单纯的内存,为了协调数据源与用户,NIO中的buffer封装了一些特有的成员与方法。
Basic Buffer Usage
使用buffer读写数据,无论是读还是写都涉及四个具体的小步骤,下边分别详细说明。
读操作
向buffer写入数据。这一步通过调用channel的read()方法实现,例如本文第二节中出现的代码:bytesRead = inChannel.read(buf);,要求channel从它后端的数据源中read出数据并写入buffer,注意channel的非阻塞特性,此时buffer处于写模式。
调用buffer的flip()方法,实现写模式到读模式的转换,表示用户接下来将从buffer中读数据。
用户从buffer中将数据读出,例如调用buffer的get()方法等。
调用buffer的clear()方法,彻底清空buffer中的数据,既使buffer中存在着用户尚未读出来的数据。或者compact()方法,只清空已经读出来的数据,未读出的数据则继续保留并移动到buffer的开始处。假如我们还需要继续从文件读数据,则继续调用channel的read()方法,并循环以上过程,只到在某个条件满足退出循环。
写操作
用户向buffer中写入或者追加数据,此时buffer处于写模式。
调用buffer的flip()方法,实现写模式到读模式的转换,表示接下来会要求channel将buffer中数据读出并写入到后端的数据源。
从buffer中读出数据。这一步通过调用channel的write()方法实现,这要求channel从buffer中读出数据并写入到后端的数据源,注意channel的非阻塞特性。
用户判断buffer的状态,确认写入的进度,如buffer中的数据有多少已经被channel写入到后端,有多少尚未写入。用户可持续这四个步骤,直到写入全部数据。
读操作的简单代码可参考第二节。
原理
要理解buffer如何工作,需要熟悉它的三个属性。
capacity
position
limit
Capacity表示buffer的容量,容量在创建buffer时指定,它是一成不变的。通过前边的介绍可知,buffer有模式,通过flip()方法可在读、写模式之间切换。模式不同,则position与limit的含义也不同。先看一张图,稍后解释它。
看一下上图中右边的图,它表示写模式下的buffer。此时,postion代表下一个可以写入的位置,最开始时这个值是0,随着数据的增加position会持续变大。而limit此时与capacity相同,表示position的最大值限制,postion不能超过limit的限制。
将buffer由写模式切换到读模式后,postion与limit的含义及值均发生变化。postion指向buffer开始的位置,也就是0。而limit则指向写模式下的postion的位置。随着读操作的进行,postion的值增加,但它一定小于limit的值,因为limit及其以后的空间是无效的,还没有写入数据。
假如再将buffer由读模式切换到写模式,有三种可能。
如果在读模式下buffer中的数据即没有通过clear()方法彻底清空,也没有通过compact()方法将已经读取的数据清空,则buffer切换回写模式后什么都不会变,相当于没有切换。
如果在读模式下buffer中的数据通过clear()方法彻底清空,则切换回写模式后,postion将指向0的位置,相当于这是一个空的buffer。
如果在读模式下buffer中的数据通过调用compact()方法,只是将已经读取过的数据清空。则切换回写模式后,buffer中剩余的未读数据将会向buffer的顶端移动,同时postion的位置也会向顶端方向移动,也就是未读数据仍然保留。
遗憾的是BIO探索就到此结束了,因为我在实现类中发现了异步的实现类,大概就是传闻的AIO。
AIO之于BIO
打开该类的实现类,发现有两个类型的channel实现了异步,分别是AsynchronousChannel接口和AysnchronousFileChannel类,再看AsynchronousChannel又会发现有几个类实现了该接口
顺便看一下AysnchronousFileChannel介绍:
/** * An asynchronous channel for reading, writing, and manipulating a file. * * <p> An asynchronous file channel is created when a file is opened by invoking * one of the {@link #open open} methods defined by this class. The file contains * a variable-length sequence of bytes that can be read and written and whose * current size can be {@link #size() queried}. The size of the file increases * when bytes are written beyond its current size; the size of the file decreases * when it is {@link #truncate truncated}. * * <p> An asynchronous file channel does not have a <i>current position</i> * within the file. Instead, the file position is specified to each read and * write method that initiates asynchronous operations. A {@link CompletionHandler} * is specified as a parameter and is invoked to consume the result of the I/O * operation. This class also defines read and write methods that initiate * asynchronous operations, returning a {@link Future} to represent the pending * result of the operation. The {@code Future} may be used to check if the * operation has completed, wait for its completion, and retrieve the result. * * <p> In addition to read and write operations, this class defines the * following operations: </p> * * <ul> * * <li><p> Updates made to a file may be {@link #force <i>forced * out</i>} to the underlying storage device, ensuring that data are not * lost in the event of a system crash. </p></li> * * <li><p> A region of a file may be {@link #lock <i>locked</i>} against * access by other programs. </p></li> * * </ul> * * <p> An {@code AsynchronousFileChannel} is associated with a thread pool to * which tasks are submitted to handle I/O events and dispatch to completion * handlers that consume the results of I/O operations on the channel. The * completion handler for an I/O operation initiated on a channel is guaranteed * to be invoked by one of the threads in the thread pool (This ensures that the * completion handler is run by a thread with the expected <em>identity</em>). * Where an I/O operation completes immediately, and the initiating thread is * itself a thread in the thread pool, then the completion handler may be invoked * directly by the initiating thread. When an {@code AsynchronousFileChannel} is * created without specifying a thread pool then the channel is associated with * a system-dependent default thread pool that may be shared with other * channels. The default thread pool is configured by the system properties * defined by the {@link AsynchronousChannelGroup} class. * * <p> Channels of this type are safe for use by multiple concurrent threads. The * {@link Channel#close close} method may be invoked at any time, as specified * by the {@link Channel} interface. This causes all outstanding asynchronous * operations on the channel to complete with the exception {@link * AsynchronousCloseException}. Multiple read and write operations may be * outstanding at the same time. When multiple read and write operations are * outstanding then the ordering of the I/O operations, and the order that the * completion handlers are invoked, is not specified; they are not, in particular, * guaranteed to execute in the order that the operations were initiated. The * {@link java.nio.ByteBuffer ByteBuffers} used when reading or writing are not * safe for use by multiple concurrent I/O operations. Furthermore, after an I/O * operation is initiated then care should be taken to ensure that the buffer is * not accessed until after the operation has completed. * * <p> As with {@link FileChannel}, the view of a file provided by an instance of * this class is guaranteed to be consistent with other views of the same file * provided by other instances in the same program. The view provided by an * instance of this class may or may not, however, be consistent with the views * seen by other concurrently-running programs due to caching performed by the * underlying operating system and delays induced by network-filesystem protocols. * This is true regardless of the language in which these other programs are * written, and whether they are running on the same machine or on some other * machine. The exact nature of any such inconsistencies are system-dependent * and are therefore unspecified. * * @since 1.7 */
重点大概这么几点
1.fileChannel打开一个文件后,将读写的工作交给channle对象。
2.读写操作会返回一个future,这个future将被查询是否完成了工作,这就是异步工作的原理了。
贴一下两个方法:
read: public abstract Future<Integer> read(ByteBuffer dst, long position);
/** * Reads a sequence of bytes from this channel into the given buffer, * starting at the given file position. * * <p> This method initiates the reading of a sequence of bytes from this * channel into the given buffer, starting at the given file position. This * method returns a {@code Future} representing the pending result of the * operation. The {@code Future}'s {@link Future#get() get} method returns * the number of bytes read or {@code -1} if the given position is greater * than or equal to the file's size at the time that the read is attempted. * * <p> This method works in the same manner as the {@link * AsynchronousByteChannel#read(ByteBuffer)} method, except that bytes are * read starting at the given file position. If the given file position is * greater than the file's size at the time that the read is attempted then * no bytes are read. * * @param dst * The buffer into which bytes are to be transferred * @param position * The file position at which the transfer is to begin; * must be non-negative * * @return A {@code Future} object representing the pending result * * @throws IllegalArgumentException * If the position is negative or the buffer is read-only * @throws NonReadableChannelException * If this channel was not opened for reading */ public abstract Future<Integer> read(ByteBuffer dst, long position);
write:public abstract Future<Integer> wriet(ByteBuffer src, long position);
/** * Writes a sequence of bytes to this channel from the given buffer, starting * at the given file position. * * <p> This method initiates the writing of a sequence of bytes to this * channel from the given buffer, starting at the given file position. The * method returns a {@code Future} representing the pending result of the * write operation. The {@code Future}'s {@link Future#get() get} method * returns the number of bytes written. * * <p> This method works in the same manner as the {@link * AsynchronousByteChannel#write(ByteBuffer)} method, except that bytes are * written starting at the given file position. If the given position is * greater than the file's size, at the time that the write is attempted, * then the file will be grown to accommodate the new bytes; the values of * any bytes between the previous end-of-file and the newly-written bytes * are unspecified. * * @param src * The buffer from which bytes are to be transferred * @param position * The file position at which the transfer is to begin; * must be non-negative * * @return A {@code Future} object representing the pending result * * @throws IllegalArgumentException * If the position is negative * @throws NonWritableChannelException * If this channel was not opened for writing */ public abstract Future<Integer> write(ByteBuffer src, long position);
这两个方法都有另外一个孪生方法,可以传入一个完成时执行的操作,该方法将读写操作全部交予channel。
AIO是真正的异步模式,线程向系统发起异步IO操作,整个操作的完成都由系统负责。线程可以等待异步IO的完成,然后再执行后续的操作,这个称为“将来式”。另一种是线程发起异步IO时,同时指明当IO异步操作完成时的后续处理,这个是“回调式”。“回调式”需要线程池的配置,当系统完成异步IO后,将线程指定的后续处理当成一个任务提交给线程池,线程池中的线程空闲时提取任务,执行其中的处理。
IO总结
IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高
但是对AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。
NETTY
上述讲完NIO和AIO,NETTY也就一步到位算了。
NETTY是什么,我们一般说到微服务通信,响应式编程的时候会提到它,Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。他实现的原理核心就是NIO。
到目前为止,我对NETTY的了解其实也很有限,因为涉及到有真正应用NETTY的项目都没有上线,能够直观感受到的时候NETTY对NIO的接口封装之后,使用起来会方便得多。下面先用一张数据流图和一个例子来说明NETTY,至于更详细的学习记录,以后再出。
图片来源:https://www.cnblogs.com/imstudy/p/9908791.html
图中可以识别出几个关键点:
1.初始化创建 2 个 NioEventLoopGroup:其中 boosGroup 用于 Accetpt 连接建立事件并分发请求,workerGroup 用于处理 I/O 读写事件和业务逻辑。
2.NioEventLoopGroup 相当于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。
每个 Boss NioEventLoop 循环执行的任务包含 3 步:
1)轮询 Accept 事件;
2)处理 Accept I/O 事件,与 Client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上;
3)处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务。
每个 Worker NioEventLoop 循环执行的任务包含 3 步:
1)轮询 Read、Write 事件;
2)处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理;
3)处理任务队列中的任务,runAllTasks。
例子
最后给一个NETTY的例子,例子来源: https://www.jianshu.com/p/a4e03835921a
NettyClient.java
/** * @author 闪电侠 */ public class NettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }); Channel channel = bootstrap.connect("127.0.0.1", 8000).channel(); while (true) { channel.writeAndFlush(new Date() + ": hello world!"); Thread.sleep(2000); } } }
NettyServer.java
/** * @author 闪电侠 */ public class NettyServer { public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup boos = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap .group(boos, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) .bind(8000); } }
未完待续...