根据UNIX网络编程对I/O模型的分类,UNIX提供了5种I/O模型,这里只讲述阻塞I/O模型,非阻塞I/O模型,I/O复用模型这三种;
同步I/O操作:导致请求进程阻塞,直到I/O操作完成;
异步I/O操作:不导致请求进程阻塞;
阻塞I/O模型(BIO):
最常用的I/O模型就是阻塞I/O模型,缺省情况下,所有文件操作都是阻塞的;以套接字为例,在进程空间中调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或发送错误时才返回,在此期间一直会等待,进程在从调用recvfrom开始到它返回的整段时间内都是阻塞的;
-
同步阻塞I/O服务端通信模型(一客户端一线程)
由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁;
该模型最大的问题缺乏弹性的伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的关系,由于线程是Java虚拟机非常宝贵的系统资源,当线程数暴涨后,系统性能将会急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出,创建新线程失败等问题(注:系统开辟线程数是有上限的),并最终导致进程宕机或僵死,不能对外提供服务;
-
采用线程池和任务队列实现的伪异步I/O模型
当有新的客户端接入的时候,将客户端的Socket封装成一个Task(该任务实现 java.lang.Runnable接口)投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列(即线程池配置的阻塞队列)和N个活跃线程(即线程池配置的核心线程数)对任务队列中的任务进行处理;
上述的伪异步I/O模型对之前的一客户一线程的模型做了简单的优化,但是它无法解决BIO在通信阻塞上的问题;
如服务端处理缓慢,导致响应超时;
-
BIO存在的问题
-
每个请求都需要创建独立的线程,与对应的客户端进行数据读,业务处理进行数据写;
-
当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大;
-
连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在读操作上,造成线程资源浪费;
非阻塞I/O模型(NIO):
recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误,一般都对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来;
上图中前三次调用recvfrom时没有数据可返回,因此内核会立即返回一个EWOULDBLOCK错误,当第四次调用recvfrom时已有一个数据报准备好,它被复制到应用进程缓冲区,应用调用recvfrom成功返回;
应用进程对一个非阻塞fd循环调用recvfrom,即为轮询;应用进程持续轮询内核,以查看fd是否就绪,这样会耗费大量CPU资源;
I/O多路复用模型:
I/O多路复用是一种同步I/O模型,同一个线程可以监听多个文件句柄(即fd),处理多个I/O事件;I/O多路指的是多个文件句柄(即fd),复用指的是该过程的多个I/O事件由同一个线程处理;
Linux提供select/poll函数调用,进程通过将一个或多个fd传递给select或poll调用,阻塞在select/poll调用上,可侦测一个或多个fd是否处于就绪状态(即等待数据报套接字变为可读),当select/poll返回套接字可读状态时,应用进程调用recvfrom把数据报复制到应用进程缓冲区;而select/poll是顺序扫描fd的;
Linux还提供一种epoll函数调用,epoll使用事件驱动的方式替代顺序扫描fd,因此性能更高;当fd有序时,立即回调返回;
I/O多路复用的优势在于应用进程可以等待多个fd就绪;
关于I/O模型的比较如下
select | poll | epoll(JDK 1.5及以上) | |
---|---|---|---|
操作方式 | 遍历 | 每次调用都进行 线性遍历,时间 复杂度为O(n) | 回调 |
底层实现 | 数组 | 链表 | 哈希表 |
IO效率 | 每次调用都进行线 性遍历,时间复杂 度为O(n) | 每次调用都进行 线性遍历,时间 复杂度为O(n) | 事件通知方式,每当有IO事件 就绪,系统注册的回调函数就 会被调用,时间复杂度O(1) |
最大连接 | 有上限,1024个fd | 无上限 |
在JDK1.4推出Java NIO之前,Java的Socket通信采用的时同步阻塞模式,即BIO;
JDK1.4版本提供了新的NIO类库,这里NIO不是单纯的非阻塞I/O,NIO为New I/O,它是非阻塞I/O和多路I/O的结合;
Java NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器);
- Channel
Channel是一个通道,可以通过它读取和写入数据,通道与流不同在于通道是双向的,流只能一个方向移动(一个流必须是InputStream或者OutputStream的子类),而通道可以用于读,写或同时读写;
- Buffer
Buffer(缓冲区)本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况;Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer;
- Selector
可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器);
Selector(选择器) 能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理;这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求;
只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程;避免了多线程之间的上下文切换导致的开销;
Channel 会注册到 Selector 上,由 Selector 根据 Channel 读写事件的发生将其交由某个空闲的线程处理(也可以是单线程,如Redis6之前的线程模型是单线程I/O多路的,Redis6开始更新线程模型为多线程模型,如果是单线程Selector读取到一个大Key,由于读取处理时间较长,对于后面处理监听的事件会有影响,那么不推荐使用大Key的说法就能说通了);
SelectionKey,表示 Selector 和网络通道的注册关系;
int OP_ACCEPT:有新的网络连接可以 accept,值为 16 int OP_CONNECT:代表连接已经建立,值为 8 int OP_READ:代表读操作,值为 1 int OP_WRITE:代表写操作,值为 4 源码中: public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
ServerSocketChannel 在服务器端监听新的客户端 Socket 连接;
SocketChannel,网络 IO 通道,具体负责进行读写操作;NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区;
测试demo如下:
服务端
public class NIOSocketServer { //public static ExecutorService pool = Executors.newFixedThreadPool(10); public static void main(String[] args) throws IOException { // 创建一个在本地端口进行监听的服务Socket通道.并设置为非阻塞方式 ServerSocketChannel ssc = ServerSocketChannel.open(); //必须配置为非阻塞才能往selector上注册,否则会报错,selector模式本身就是非阻塞模式 ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(9000)); // 创建一个选择器selector Selector selector = Selector.open(); // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣 ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { System.out.println("等待事件发生.."); // 轮询监听channel里的key,select是阻塞的,accept()也是阻塞的 int select = selector.select(); System.out.println("有事件发生了.."); // 有客户端请求,被轮询监听到 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); //删除本次已处理的key,防止下次select重复处理 it.remove(); handle(key); } } } private static void handle(SelectionKey key) throws IOException { if (key.isAcceptable()) { System.out.println("有客户端连接事件发生了.."); ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //NIO非阻塞体现:此处accept方法是阻塞的,但是这里因为是发生了连接事件,所以这个方法会马上执行完,不会阻塞 //处理完连接请求不会继续等待客户端的数据发送 SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //通过Selector监听Channel时对读事件感兴趣 sc.register(key.selector(), SelectionKey.OP_READ); } else if (key.isReadable()) { System.out.println("有客户端数据可读事件发生了.."); SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); //NIO非阻塞体现:首先read方法不会阻塞,其次这种事件响应模型,当调用到read方法时肯定是发生了客户端发送数据的事件 int len = sc.read(buffer); if (len != -1) { System.out.println("读取到客户端发送的数据:" + new String(buffer.array(), 0, len)); } ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes()); sc.write(bufferToWrite); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (key.isWritable()) { SocketChannel sc = (SocketChannel) key.channel(); System.out.println("write事件"); // NIO事件触发是水平触发 // 使用Java的NIO编程的时候,在没有数据可以往外写的时候要取消写事件, // 在有数据往外写的时候再注册写事件 key.interestOps(SelectionKey.OP_READ); sc.close(); } } }
客户端
public class NIOSocketClient { //通道管理器 private Selector selector; /** * 启动客户端测试 * * @throws IOException */ public static void main(String[] args) throws IOException { NIOSocketClient client = new NIOSocketClient(); client.initClient("127.0.0.1", 9000); client.connect(); } /** * 获得一个Socket通道,并对该通道做一些初始化的工作 * * @param ip 连接的服务器的ip * @param port 连接的服务器的端口号 * @throws IOException */ public void initClient(String ip, int port) throws IOException { // 获得一个Socket通道 SocketChannel channel = SocketChannel.open(); // 设置通道为非阻塞 channel.configureBlocking(false); // 获得一个通道管理器 this.selector = Selector.open(); // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调 //用channel.finishConnect();才能完成连接 channel.connect(new InetSocketAddress(ip, port)); //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。 channel.register(selector, SelectionKey.OP_CONNECT); } /** * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 * * @throws IOException */ public void connect() throws IOException { boolean flag = false; // 轮询访问selector while (!flag) { selector.select(); // 获得selector中选中的项的迭代器 Iterator<SelectionKey> it = this.selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); // 删除已选的key,以防重复处理 it.remove(); // 连接事件发生 if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); // 如果正在连接,则完成连接 if (channel.isConnectionPending()) { channel.finishConnect(); } // 设置成非阻塞 channel.configureBlocking(false); //在这里可以给服务端发送信息哦 ByteBuffer buffer = ByteBuffer.wrap("HelloServer".getBytes()); channel.write(buffer); //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。 channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件 } else if (key.isReadable()) { read(key); SelectableChannel channel = key.channel(); channel.close(); flag = true; } } } } /** * 处理读取服务端发来的信息 的事件 * * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException { //和服务端的read方法一样 // 服务器可读取消息:得到事件发生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 创建读取的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(512); int len = channel.read(buffer); if (len != -1) { System.out.println("客户端收到信息:" + new String(buffer.array(), 0, len)); } } }
参考:
《Netty权威指南》
《UNIX网络编程卷1》