一、NIO概述
1. BIO带来的挑战
BIO即堵塞式I/O,数据在写入或读取时都有可能堵塞,一旦有堵塞,线程将失去CPU的使用权,性能较差。
2. NIO工作机制
Java NIO由Channel、Buffer、Selector三个核心组成,NIO框架类结构图如下:
其中,Buffer主要负责存取数据,Channel用于数据传输,获取数据,然后流入Buffer;或从Buffer取数据,发送出去。
Selector允许单线程处理多个Channel,如果打开了多个连接(Channel),但每个连接的数据流量很小,使用Selector则很方便。
二、Channel
Channel类主要位于java.nio.channels包下,类结构图如下:
Channel跟流相似,但流是单向的,而Channel是双向的。Channel总是从Buffer获取数据(写文件)或将数据写入Buffer(读文件时)。常用的Channel如下:
- FileChannel,从文件中读写数据。
- DatagramChannel,能通过UDP读写网络中的数据。
- SocketChannel,能通过TCP读写网络中的数据。
- ServerSocketChannel,可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
package com.yyn.nio; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class ByteBufferTest { public static void main(String [] args) throws IOException{ // testRead(); //从文件读取数据,chanel向buffer中写数据 testWrite(); } //写文件 public static void testWrite() throws IOException{ RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw"); FileChannel fChannel = raFile.getChannel(); String data = "天王盖地虎 小鸡炖蘑菇 要从此路过 就得跳支舞"; byte[] dataByte = data.getBytes("UTF-8"); System.out.println(dataByte.length); ByteBuffer buf = ByteBuffer.allocate(dataByte.length); buf.put(dataByte, 0, dataByte.length); buf.flip(); //切换buffer到读模式 fChannel.write(buf); //从buffer读取数据到channel fChannel.force(true); //强制将数据刷新到磁盘,不一定有用 buf.clear(); buf.put(dataByte, 0, 10); buf.mark(); buf.put(dataByte,10,10); buf.reset(); buf.put(dataByte, 0, 10); buf.flip(); //切换buffer到读模式 fChannel.write(buf); //从buffer读取数据到channel fChannel.close(); System.out.println("write over!!"); } //读文件 public static void testRead() throws IOException { RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw"); FileChannel fChannel = raFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(10); int byteRead = fChannel.read(buf); StringBuffer sBuffer = new StringBuffer(); while(byteRead != -1){ buf.flip(); //change to read mode byte [] bs = null; int limite = buf.limit(); if(buf.hasArray()){ bs = buf.array(); } if(bs != null){ System.out.println("bs length: "+limite); sBuffer.append(new String(bs,0,limite ,"UTF-8")); } buf.clear(); // make buffer ready for write,clear all buffer //buf.compact(); // make buffer ready for write,clear data readed in buffer byteRead = fChannel.read(buf); } fChannel.close(); System.out.println("####:"+sBuffer.toString()); } }
2.NIO优化方法
2.1 FileChannel.transformXXX方法
2.2 FileChannel.map方法
三、Buffer
Buffer是一片缓冲区,可读可写,非线程安全的,NIO包中针对常用的类型设置了Buffer,类结构图如下:
要使用Buffer,需记住3个方法和4个特性
- flip()方法,切换Buffer为读状态,此时Buffer可读。limit设置为position,position设置为0
- clear()方法,切换Buffer为写状态,会清空Buffer里所有数据。position为0,limit置为capacity
- compact()方法,切换Buffer为写状态,清空Buffer里所有已读数据,将未读数据剪切到Buffer前端。position设置为limit,limit设置为capacity
要理解其4个特性,
- capacity,Buffer的总长度,该值总是保持不变。A buffer's capacity is the number of elements it contains. The capacity of a buffer is never negative and never changes
- position,下一个要操作的数据元素的位置,该值总是小于等于capacity和limit。Buffer为读状态时,表示下一个要读的位置,Buffer为写状态时,表示下一个要写的位置。
A buffer's position is the index of the next element to be read or written. A buffer's position is never negative and is never greater than its limit.
- limit,Buffer中第一个不可操作元素的位置,limit<=capacity。A buffer's limit is the index of the first element that should not be read or written. A buffer's limit is never negative and is never greater than its capacity.
- mark,用于记录当前position的前一个位置
Buffer状态转换过程描述
从Buffer中读数据方式:buffer.get()方法和channel.write()方法。
向Buffer中写数据方式:buffer.put()方法和channel.read()方法。
package com.yyn.nio; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class ByteBufferTest { public static void main(String [] args) throws IOException{ // testRead(); //从文件读取数据,chanel向buffer中写数据 testWrite(); } //写文件 public static void testWrite() throws IOException{ RandomAccessFile raFile = new RandomAccessFile("byte_buffer_write.txt", "rw"); FileChannel fChannel = raFile.getChannel(); String data = "天王盖地虎 小鸡炖蘑菇 要从此路过 就得跳支舞"; byte[] dataByte = data.getBytes("UTF-8"); System.out.println(dataByte.length); ByteBuffer buf = ByteBuffer.allocate(dataByte.length); buf.put(dataByte, 0, dataByte.length); buf.flip(); //切换buffer到读模式 fChannel.write(buf); //从buffer读取数据到channel fChannel.force(true); //强制将数据刷新到磁盘,不一定有用 buf.clear(); buf.put(dataByte, 0, 10); buf.mark(); buf.put(dataByte,10,10); buf.reset(); buf.put(dataByte, 0, 10); buf.flip(); //切换buffer到读模式 fChannel.write(buf); //从buffer读取数据到channel fChannel.close(); System.out.println("write over!!"); } //读文件 public static void testRead() throws IOException { RandomAccessFile raFile = new RandomAccessFile("test.txt", "rw"); FileChannel fChannel = raFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(10); int byteRead = fChannel.read(buf); StringBuffer sBuffer = new StringBuffer(); while(byteRead != -1){ buf.flip(); //change to read mode byte [] bs = null; int limite = buf.limit(); if(buf.hasArray()){ bs = buf.array(); } if(bs != null){ System.out.println("bs length: "+limite); sBuffer.append(new String(bs,0,limite ,"UTF-8")); } buf.clear(); // make buffer ready for write,clear all buffer //buf.compact(); // make buffer ready for write,clear dat a readed in buffer byteRead = fChannel.read(buf); } fChannel.close(); System.out.println("####:"+sBuffer.toString()); } }
2. Buffer其他方法介绍
2.1 rewind()方法
Buffer.rewind()将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少。
2.2 equals()方法
当满足下列条件时,表示两个Buffer相等:
有相同的类型(byte、char、int等)。
Buffer中剩余的byte、char等的个数相等。
Buffer中所有剩余的byte、char等都相同。
如你所见,equals只是比较Buffer的一部分,不是每一个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。
2.3 compareTo()方法
compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:
第一个不相等的元素小于另一个Buffer中对应的元素 。
所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。
3. Buffer的Scatter/Gather
scatter(分散)是指从Channel读取数据后,写入到多个Buffer中。
gather(聚集)是指写操作时,从多个Buffer读取数据并写入到一个Channel中。
四、Selector
Selector在NIO编程中充当一个调度器的角色,轮训在其注册的channel是否ready,若ready则开始执行操作。
仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
但是,需要记住,现代的操作系统和CPU在多任务方面表现的越来越好,所以多线程的开销随着时间的推移,变得越来越小了。实际上,如果一个CPU有多个内核,不使用多任务可能是在浪费CPU能力。不管怎么说,关于那种设计的讨论应该放在另一篇不同的文章中。在这里,只要知道使用Selector能够处理多个通道就足够了。
1. Selector介绍
Selector包含3个Set对象来管理SelectionKey对象,分别是以下三种:
使用Selector前,需要确保以下操作已经执行完成:
- Selector selector = Selector.open(); //调用open方法,获取一个Selector实例。
- channel.configureBlocking(false); // 设置Channel为非堵塞模式
- channel.register(selector , SelectionKey.OP_ACCEPT); //将Channel注册到selector中,并设置需监听的事件
可以监听四种不同类型的事件:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
2. Selector常用方法
2.1 selectXXX()方法
- int select(),返回就绪channel的个数,会堵塞。
- int select(long timeout),返回就绪channel个数,堵塞timeout
- int selectNow(),返回就绪channel个数,不堵塞
注意:每次调用selectXXX方法时,会返回此次就绪数量,例如,有一个channel就绪,则返回1,但未对这个channel的数据进行处理。接下来又有一个channel就绪,调用selectXXX方法还是返回1,但实际上此时有2个channel就绪但未被处理。
2.2 selectedKeys()
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。
当像Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。
注意:Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
2.3 wakeUp()
某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。
2.4 close()
用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。
3. 基于NIO的网络Demo
3.1 单线程版,主线程负责处理accept和read
package com.yyn.nio.net; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 本例子服务端只处理accept和read事件,单线程版 * * @author Michael * */ public class NIOSingleServer { private Selector selector = null; //private ExecutorService pool; public static Charset charset = Charset.forName("UTF-8"); public NIOSingleServer init(int port) throws IOException { //pool = Executors.newFixedThreadPool(5); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 设置为非堵塞模式 ssc.socket().bind(new InetSocketAddress(port)); selector = Selector.open(); // 获取一个selector ssc.register(selector, SelectionKey.OP_ACCEPT); return this; } public void listen() throws IOException { System.out.println("Server started....."); while (true) { int n = 0; n = selector.select(); // 获取就绪操作的个数 if(n == 0){ continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // 每次使用后需要手工移除 SocketChannel channel = null; if (key.isAcceptable()) { try { // init函数中注册的是ServerSocketChannel ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 获取实际的SocketChannel,类似于Socket IO中的Socket channel = serverSocketChannel.accept(); System.out.println("客户端:" + channel.getRemoteAddress() + "已连接"); channel.configureBlocking(false); SelectionKey k = channel.register(selector, SelectionKey.OP_READ); // 注册read监听,监听客户端发过来的数据 //Worker worker = new Worker(k); //k.attach(worker); } catch (Exception e) { if (channel != null) { channel.close(); } } } else { if (key.isReadable()) { System.out.println("begin to process read!!!!"); channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.clear(); //切换buffer为写模式 int len = 0; try{ while ((len = channel.read(buffer)) > 0) { buffer.flip(); //切换buffer为read模式 System.out.println("客户端数据:"+charset.decode(buffer).toString()); buffer.clear(); } if(len == -1){ // The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream System.out.println("客户端断开"); channel.close(); continue; } }catch(Exception e){ System.out.println("客户端异常啦"); } } if (key.isWritable()) { } } } } } public static void main(String[] args) throws IOException { // TODO Auto-generated method stub NIOSingleServer server = new NIOSingleServer(); server.init(12003).listen(); } }
package com.yyn.nio.net; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; public class NIOSingleClient { public static void main(String[] args) throws UnknownHostException, IOException { // TODO Auto-generated method stub Socket socket = new Socket("127.0.0.1", 12003); OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8"); PrintWriter out = new PrintWriter(osw); InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8"); BufferedReader in = new BufferedReader(isr); String data = ""; while(true){ data = in.readLine(); data = data.trim().toUpperCase(); if(data.equals("EIXT")){ out.close(); socket.close(); System.exit(0); } System.out.println("read data from comsole:" + data); out.println(data); out.flush(); System.out.println("sending data to server:" + data); } } }
3.2 多线程版,主线程负责accept,子线程负责read
package com.yyn.nio.net; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.logging.LoggingMXBean; /** * 本例子服务端只处理accept和read事件,多线程版 * @author Michael * */ public class NIOMultiServer { private Selector acceptSelector = null; private Selector readSelector = null; public static Charset charset = Charset.forName("UTF-8"); public NIOMultiServer init(int port) throws IOException { //pool = Executors.newFixedThreadPool(5); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 设置为非堵塞模式 ssc.socket().bind(new InetSocketAddress(port)); acceptSelector = Selector.open(); // 获取一个selector readSelector = Selector.open(); ssc.register(acceptSelector, SelectionKey.OP_ACCEPT); return this; } public void listen() throws IOException { System.out.println("Server started....."); new Worker(this.readSelector).start(); while (true) { int n = 0; n = acceptSelector.select(); if(n == 0) continue; Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // init函数中注册的是ServerSocketChannel ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 获取实际的SocketChannel,类似于Socket IO中的Socket,必须accept后才有SocketChannel SocketChannel channel = serverSocketChannel.accept(); channel.configureBlocking(false); System.out.println("客户端:" + channel.getRemoteAddress() + "已连接"); if(key.isAcceptable()){ channel.register(this.readSelector, SelectionKey.OP_READ); } } } } private static class Worker extends Thread{ private Selector readSelector = null; public Worker(Selector selector){ this.readSelector = selector; } public void run(){ System.out.println("Read thread started...."); while (true) { int n = 0; SocketChannel channel = null; try { n= this.readSelector.select(10); if(n == 0) continue; System.out.println("read thread, n is: " + n); Iterator<SelectionKey> it = readSelector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if(key.isReadable()){ channel = (SocketChannel) key.channel(); System.out.println("begin to process read at: " + channel.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.clear(); //将buffer切换为写模式 long len = 0; while((len = channel.read(buffer)) > 0){ buffer.flip(); //将buffer切换为读模式 System.out.println("客户端数据:"+charset.decode(buffer).toString()); buffer.clear(); } if(len == -1){ System.out.println("客户端断开"); channel.close(); continue; } } } } catch (IOException e) { System.out.println("客户端异常啦"); try { channel.close(); } catch (IOException e1) { // TODO Auto-generated catch block System.out.println("关闭channel发生异常"); } } } } } public static void main(String[] args) throws IOException { NIOMultiServer server = new NIOMultiServer(); server.init(12003); server.listen(); } }
package com.yyn.nio.net; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; public class NIOSingleClient { public static void main(String[] args) throws UnknownHostException, IOException { // TODO Auto-generated method stub Socket socket = new Socket("127.0.0.1", 12003); OutputStreamWriter osw = new OutputStreamWriter(socket.getOutputStream(), "UTF-8"); PrintWriter out = new PrintWriter(osw); InputStreamReader isr = new InputStreamReader(new BufferedInputStream(System.in), "UTF-8"); BufferedReader in = new BufferedReader(isr); String data = ""; while(true){ data = in.readLine(); data = data.trim().toUpperCase(); if(data.equals("EIXT")){ out.close(); socket.close(); System.exit(0); } System.out.println("read data from comsole:" + data); out.println(data); out.flush(); System.out.println("sending data to server:" + data); } } }
dd