第一.NIO概述
java.nio 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO)。新增了许多用于处理输入输出
的类,这些类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,新增了满足 NIO 的功能。
NIO以块的方式处理数据,块IO的效率比流IO的效率高很多,NIO是非阻塞式的,使用它可以提供非阻塞的高伸缩性网络。
NIO主要有三大核心:Channel(通道)、Buffer(缓冲区)、Selector(选择器)。NIO是基于Channel和缓冲区进行操作的,数据是从通道读取到缓冲区,或者是缓冲区写入到通道中。
Selector(选择区)用于监听多个通道的事件(比如:连接请求、数据到达等),使用单个线程就可以监听到多个客户端通道
第二.NIO的三大核心
2.1.缓冲区Buffer
2.1.1Buffer操作API
缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组,在 NIO 库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问 NIO 中的数据,都是将它放到缓冲区中。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer,如下图所示:
在 NIO 中,所有的缓冲区类型都继承于抽象类 Buffer,最常用的就是 ByteBuffer,对于 Java 中的基本类型,基本都有一个具体 Buffer 类型与之相对应,它们之间的继承关系如下图所示:
ByteBuffer,存储字节数据到缓冲区
ShortBuffer,存储字符串数据到缓冲区
CharBuffer,存储字符数据到缓冲区
IntBuffer,存储整数数据到缓冲区
LongBuffer,存储长整型数据到缓冲区
DoubleBuffer,存储小数到缓冲区
FloatBuffer,存储小数到缓冲区
对于 Java 中的基本数据类型,都有一个 Buffer 类型与之相对应,最常用的自然是ByteBuffer 类(二进制数据),该类的主要方法如下所示:
public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区
public abstract byte[] get(); 从缓冲区获得字节数据
public final byte[] array(); 把缓冲区数据转换成字节数组
public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array); 把一个现成的数组放到缓冲区中使用
public final Buffer flip(); 翻转缓冲区,重置位置到初始位置
public final int capacity()获取缓冲区的容量
下面是使用ByteBuffer的简单例子:
public class BuffferDemo01 {
public static void main(String[] args) {
//分配新的 int 缓冲区,参数为缓冲区容量
// 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。它将具有一个底层实现数组,其数组偏移量将为零。
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
for (int i = 0; i < byteBuffer.capacity(); i++) {
int j = 2*(i+1);
// 将给定整数写入此缓冲区的当前位置,当前位置递增
byteBuffer.put((byte) j);
}
// 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为 0
byteBuffer.flip();
//查看在当前位置和限制位置之间是否有元素
while (byteBuffer.hasRemaining()){
//读取此缓冲区当前位置的整数,然后当前位置递增
int j = byteBuffer.get();
System.out.print(j+" ");
}
}
}
2.1.2Buffer的基本原理
缓冲区对象本质上是一个数组,但它其实是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果使用 get()方法从缓冲区获取数据或者使用 put()方法把数据写入缓冲
区,都会引起缓冲区状态的变化。 在缓冲区中,最重要的属性有下面三个,它们一起合作完成对缓冲区内部状态的变化跟踪:
position:指定下一个将要被写入或者读取的元素索引,它的值由 get()/put()方法自动更新,在新创建一个 Buffer 对象时,position 被初始化为 0。
limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。
capacity:指定了可以存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许我们使用的底层数组的容量。
以上三个属性值之间有一些相对大小的关系:0 <= position <= limit <= capacity。如果我们创建一个新的容量大小为20 的 ByteBuffer 对象,在初始化的时候,position 设置为 0,
limit 和 capacity 被设置为 10,在以后使用 ByteBuffer对象过程中,capacity 的值不会再发生变化,而其它两个个将会随着使用而变化。
代码演示:在当前目录准备一个demo.txt,输入内容:hell
public class NIODemo01 {
@Test
public void test01()throws Exception{
//文件输出通道
FileInputStream fis = new FileInputStream("demo.txt");
//获取通道
FileChannel channel = fis.getChannel();
//分配一个 10 个大小缓冲区,说白了就是分配一个 10 个大小的 byte 数组
ByteBuffer buffer = ByteBuffer.allocate(10);
output("初始化",buffer);
//先读取一下
channel.read(buffer);
output("调用read():",buffer);
//准备之前先锁定范围
buffer.flip();
output("调用 flip()", buffer);
//判断有没有读取的数据
while (buffer.remaining()>0){
byte b = buffer.get();
System.out.println("读取的数据:"+String.valueOf(b));
}
output("调用get():",buffer);
//解锁
buffer.clear();
output("调用 clear()", buffer);
//关闭通道
}
//把这个缓冲里面实时状态给打印出来
private void output(String step , ByteBuffer buffer) {
System.out.println(step+":");
//数组 容量 大小
System.out.print("capacity: " + buffer.capacity() + ", ");
//当前操作数据所在的位置,也可以叫做游标
System.out.println("position: "+buffer.position()+",");
//锁定值,flip,数据操作范围索引只能在 position - limit 之间
System.out.println("limit: " + buffer.limit());
System.out.println();
}
}
输出结果为:
下面呢对以上结果进行图解,四个属性值分别如图所示:
从通道中读取一些数据到缓冲区中,注意从通道读取数据,相当于往缓冲区中写入数据。如果读取 4 个自己
的数据,则此时 position 的值为 4,即下一个将要被写入的字节索引为 4,而 limit 仍然是 10,如下图所示:
下一步把读取的数据写入到输出通道中,相当于从缓冲区中读取数据,在此之前,必须调用 flip()方法,该方法将会完
成两件事情:
1. 把 limit 设置为当前的 position 值
2. 把 position 设置为 0
由于 position 被设置为 0,所以可以保证在下一步输出时读取到的是缓冲区中的第一个字节,而 limit 被设置为当前的
position,可以保证读取的数据正好是之前写入到缓冲区中的数据,如下图所示:
现在调用 get()方法从缓冲区中读取数据写入到输出通道,这会导致 position 的增加而 limit 保持不变,但 position 不
会超过 limit 的值,所以在读取我们之前写入到缓冲区中的 4 个自己之后,position 和 limit 的值都为 4,如下图所示:
在从缓冲区中读取数据完毕后,limit 的值仍然保持在我们调用 flip()方法时的值,调用 clear()方法能够把所有的状态变
化设置为初始化时的值,如下图所示:
2.2通道Channel
通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过 Buffer 对象来处理。不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。
同样不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。 在 NIO 中,提供了多种通道对象,而所有的通道对象都实现了 Channel 接口。
常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel和Socket。FileChannel、DatagramChannel用于UDP的数据读写,
ServerSocketChannel和SocketChannel用于TCP的数据库读写
以FileChannel类为例,该类主要用来本地文件进行IO操作该有的方法如下:
public void read(ByteBuffer dst),从通道读取数据并存放到缓冲区
public void writer(ByteBuffer dst)把缓冲区的数据写到通道
public long transferFrom(ReadableByteChannel src, long position, long count) 从目标通道复制数据到当前通道
public long transferTo(ReadableByteChannel src, long position, long count) 把数据从当前通道复制给目标通道
2.2.1案例
1.往本地写入数据
/**
* 写数据
*/
@Test
public void test01() throws Exception{
//写入内容
String content = "hell,NIO 写入文件";
//创建文件路径
FileOutputStream fos = new FileOutputStream("demo1.txt");
//获取通道
FileChannel channel = fos.getChannel();
//设置缓冲区
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
//像缓冲区写入内容
byteBuffer.put(content.getBytes());
byteBuffer.flip();
//把缓冲区的内容写入通道
channel.write(byteBuffer);
fos.close();
}
}
NIO 中的通道是从输出流对象里通过 getChannel 方法获取到的,该通道是双向的,既可以读,又可以写。在往通道里写数据之前,必须通过 put 方法把数据存到 ByteBuffer 中,然
后通过通道的 write 方法写数据。在 write 之前,需要调用filp()方法反转缓冲区,把内部重置的到初始位置,这样接下来才写数据时才能把所有数据写到通道里。运行效果如下所示:
2.从本地读取文件
/**
* 往本地读取数据
*/
@Test
public void test02() throws Exception{
//创建文件路径
FileInputStream fis = new FileInputStream("demo1.txt");
//获取通道
FileChannel channel = fis.getChannel();
//设置缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取数据到缓冲区
channel.read(byteBuffer);
String str = new String(byteBuffer.array());
System.out.println(str);
fis.close();
}
从输入流中获取一个通道,然后提供ByteBuffer缓冲区,该缓冲区的初始化容量很文件的大小一样,最后通过通道的read方法把数据读取出来并存储到了ByteBuffer中
3.复制文件
/**
* 复制视频文件
*/
@Test
public void test03() throws Exception{
FileInputStream fis =
new FileInputStream("E:\尚硅谷\22 SpringBoot整合篇\SpringBoot高级\视频\1、缓存-JSR107简介.avi");
FileOutputStream fos = new FileOutputStream("E:\1.avi");
FileChannel sourceChannel = fis.getChannel();
FileChannel destChannel = fos.getChannel();
destChannel.transferFrom(sourceChannel,0,sourceChannel.size());
destChannel.close();
sourceChannel.close();
}
2.3Selector选择器
2.3.1核心API
能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也
就是管理多个连接。这样使得只有在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并
且避免了多线程之间的上下文切换导致的开销。
NIO 中实现非阻塞 I/O 的核心对象就是 Selector,Selector 就是注册各种 I/O 事件地方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件,如下图所示:
从图中可以看出,当有读或写等任何注册的事件发生时,可以从 Selector 中获得相应的 SelectionKey,同时从 SelectionKey 中可以找到发生的事件和该事件所发生的具体的 SelectableChannel,
以获得客户端发送过来的数据。
使用 NIO 中非阻塞 I/O 编写服务器处理程序,大体上可以分为下面三个步骤:
1. 向 Selector 对象注册感兴趣的事件。
2. 从 Selector 中获取感兴趣的事件。
3. 根据不同的事件进行相应的处理。
该类的常用方法如下所示:
public static Selector open(),得到一个选择器对象
public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
public Set<SelectionKey> selectedKeys(),从内部集合中得到所有的 SelectionKey
2. SelectionKey,代表了 Selector 和网络通道的注册关系,一共四种:
int OP_ACCEPT:有新的网络连接可以 accept,值为 16
int OP_CONNECT:代表连接已经建立,值为 8
int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4
该类的常用方法如下所示:
public abstract Selector selector(),得到与之关联的 Selector 对象
public abstract SelectableChannel channel(),得到与之关联的通道
public final Object attachment(),得到与之关联的共享数据
public abstract SelectionKey interestOps(int ops),设置或改变监听事件
public final boolean isAcceptable(),是否可以 accept
public final boolean isReadable(),是否可以读
public final boolean isWritable(),是否可以写
3. ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示:
public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
4. SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 总是把缓冲区的数据写入通
道,或者把通道里的数据读到缓冲区。常用方法如下所示:
public static SocketChannel open(),得到一个 SocketChannel 通道
public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public boolean connect(SocketAddress remote),连接服务器
public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成
连接操作
public int write(ByteBuffer src),往通道里写数据
public int read(ByteBuffer dst),从通道里读数据
public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close(),关闭通道
2.3.2入门案例
使用 NIO 开发一个入门案例,实现服务器端和客户端之间的数据通信(非阻塞)。
/**
* 客户端
*/
public class NIOClient {
public static void main(String[] args) throws Exception{
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//连接网络
InetSocketAddress address = new InetSocketAddress("localhost",8081);
//判断是否连接
if(!socketChannel.connect(address)){
while(!socketChannel.finishConnect()){
System.out.println("没有服务端进行连接");
}
}
//发送任务
String str = "hell Nio服务端";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
//写入通道
socketChannel.write(byteBuffer);
System.in.read();
}
}
/**
* 服务端
*/
public class NIOServer {
public static void main(String[] args) throws Exception{
//得到通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到selector对象
Selector selector = Selector.open();
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//设置端口
serverSocketChannel.bind(new InetSocketAddress(8081));
//注册到selector对象上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
//监控客户端
if(selector.select(200)==0){
System.out.println("没有服务端连接");
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
//获取所有的监听对象
SelectionKey selectionKey = iterator.next();
//连接客户端
if(selectionKey.isAcceptable()){
//得到通道
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1023));
}
//读取数据
if(selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
socketChannel.read(buffer);
System.out.printf("客户端发来的数据:%s%n", new String(buffer.array()));
}
//删除防止重复发送
iterator.remove();
}
}
}
}
2.3.3网络聊天案例
客户端
/**
* 客户端
*/
public class ChatClient {
private SocketChannel socketChannel;
private String host = "127.0.0.1";
private Integer port = 8083;
private String userName;
public ChatClient(){
try {
//得到传输通道
socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//设置网络连接
InetSocketAddress address = new InetSocketAddress(host,port);
//连接服务器
if (!socketChannel.connect(address)){
while (!socketChannel.finishConnect()){
System.out.println("client:没有服务端进行连接");
}
}
//得到客户端 IP 地址和端口信息,作为聊天用户名使用
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println("---------------Client(" + userName + ") is ready---------------");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 往服务端发送数据
*/
public void sendMsg(String msg){
try {
//如果控制台输入 bye 就关闭通道,结束聊天
if (msg.equalsIgnoreCase("bye")) {
socketChannel.close();
socketChannel = null;
return;
}
msg = userName + "说: " + msg;
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 接收服务端消息
*/
public void receiveMsg(){
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
int count = socketChannel.read(byteBuffer);
if(count>=1){
String msg = new String(byteBuffer.array());
System.out.println(msg.trim());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
TestClient
//启动聊天程序客户端
public class TestClient {
public static void main(String[] args) {
ChatClient chatClient = new ChatClient();
new Thread(()->{
while (true){
chatClient.receiveMsg();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()){
chatClient.sendMsg(sc.nextLine());
}
}
}
服务端
/**
* 聊天服务端
*/
public class ChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private int port = 8083;
public ChatServer(){
try {
//获取监听通道
serverSocketChannel = ServerSocketChannel.open();
//获取选择器
selector = Selector.open();
//设置非阻塞
serverSocketChannel.configureBlocking(false);
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
//将选择器绑定到监听通道并监听accept通道
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
printInfo("Chat Server is ready.......");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 开始聊天
*/
public void startChat(){
while (true){
try {
if(selector.select(200)==0){
System.out.println("没有人上线:");
}
//获取被监听的accept
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
//监听accept
if(selectionKey.isAcceptable()){
//获取通道
SocketChannel socketChannel = serverSocketChannel.accept();
//设置为非阻塞
socketChannel.configureBlocking(false);
//注册
socketChannel.register(selector,SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress().toString().substring(1)+"上线了...");
}
//读取数据
if(selectionKey.isReadable()){
//获取通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//读取数据
readMsg(socketChannel);
}
//防止重复
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 读取数据
* @param socketChannel
*/
private void readMsg(SocketChannel socketChannel) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
int count = socketChannel.read(buffer);
if (count>=1){
//打印数据
String msg = new String(buffer.array());
printInfo(new String(buffer.array()));
//广播消息
broadCast(socketChannel,msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 广播消息
* @param socketChannel
*/
private void broadCast(SocketChannel socketChannel,String msg){
System.out.println("发送广播");
try {
//广播数据到所有的 SocketChannel 中
for (SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
//排除自身
if(targetChannel instanceof SocketChannel &&targetChannel!=socketChannel){
SocketChannel destChannel = (SocketChannel) targetChannel;
//把数据存入到缓冲区
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//往通道里面写数据
destChannel.write(buffer);
}
}
;
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 打印内容
* @param content
*/
private void printInfo(String content) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-HH-dd HH:mm:ss");
System.out.println("["+format.format(new Date())+"]->"+content);
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.startChat();
}
}