java I/O
I/O模型
前置知识
- 什么是同步与异步?
- 什么是阻塞与非阻塞?
- 什么是阻塞I/O与非阻塞I/O?
- 什么是同步I/O与异步I/O?
什么是同步与异步?
同步是指多个任务一起执行时,任务必须逐个完成,一个任务执行时会导致其他任务和整个流程的暂时等待。
异步是指多任务同时执行,不会导致其他任务或者整个流程处于暂停状态。
同步和异步的区别就是,执行多个任务时,一个任务的执行会不会导致其他任务的暂时暂停。
什么是阻塞与非阻塞?
阻塞:当一个任务执行时且执行的条件不满足的时候会一直等待直到条件的满足。
非阻塞:当一个任务执行且条件不满足时会返回一个指示标志(告知条件不满足),而不是一直等待。
阻塞与非阻塞的区别就是:当一个任务执行且条件不满足时,是一直等待还是返回一个指示标志。
什么是阻塞I/O与非阻塞I/O?
i/o操作分为两个步骤:
- 检查数据是否就绪
- 内核将数据拷贝到线程中
阻塞I/O和非阻塞I/O区别就在第一步,阻塞I/O检查到数据未就绪就会一直等待知道数据准备就绪。而非阻塞I/O遇到数据为就绪就返回一个指示标志,告诉线程数据没有准备就绪。
什么是同步I/O与异步I/O?
同步I/O和异步I/O的区别就在于第二个步骤,同步I/O第二个步骤是线程完成的(会使当前线程阻塞,去完成I/O操作,把数据从内核拷贝到线程),而异步I/O第二个步骤是由内核完成的。
一个同步I/O操作会导致线程被阻塞,直到I/O操作完成。
而一个异步I/O操作不会导致发出请求的线程进入阻塞状态。
5种I/O模型
阻塞I/O模型
线程提出I/O请求之后,数据如果没有就绪那么线程就会交出CPU,进入阻塞状态,数据准备完毕后再将数据从内核态复制给线程,线程才进入就绪状态。

非阻塞I/O模型
线程发出I/O请求之后,数据没有准备就绪,会返回一个指示标志发告诉线程数据没有准备就绪,但是线程不会交出CPU,而是不断地从用户带切换到内核状态,不断地询问数据是否准备就绪。这样会很浪费CPU时间。

I/O复用模型
I/O复用模型阻塞与select调用,select可以监听多个连接,当一个连接中的数据就绪的时候,再调用read操作将数据从内核复制给线程。不一定所有的情况都是I/O复用模型比阻塞式I/O快,因为I/O复用模型要进行两次系统调用,但是i/o复用模型的优势在于同时监听多个连接.


信号驱动I/O
在信号驱动IO模型中,当用户线程发起一个IO请求操作,会给对应的socket注册一个信号函数,然后用户线程会继续执行,当内核数据就绪时会发送一个信号给用户线程,用户线程接收到信号之后,便在信号函数中调用IO读写操作来进行实际的IO请求操作。

异步IO模型
异步IO模型才是最理想的IO模型,在异步IO模型中,当用户线程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从内核的角度,当它受到一个asynchronous read之后,它会立刻返回,说明read请求已经成功发起了,因此不会对用户线程产生任何block。然后,内核会等待数据准备完成,然后将数据拷贝到用户线程,当这一切都完成之后,内核会给用户线程发送一个信号,告诉它read操作完成了。也就说用户线程完全不需要实际的整个IO操作是如何进行的,只需要先发起一个请求,当接收内核返回的成功信号时表示IO操作已经完成,可以直接去使用数据了。
也就说在异步IO模型中,IO操作的两个阶段都不会阻塞用户线程,这两个阶段都是由内核自动完成,然后发送一个信号告知用户线程操作已完成。用户线程中不需要再次调用IO函数进行具体的读写。这点是和信号驱动模型有所不同的,在信号驱动模型中,当用户线程接收到信号表示数据已经就绪,然后需要用户线程调用IO函数进行实际的读写操作;而在异步IO模型中,收到信号表示IO操作已经完成,不需要再在用户线程中调用iO函数进行实际的读写操作。异步IO是需要操作系统的底层支持,在Java 7中,提供了Asynchronous IO。

五种I/O模型的比较

由图片可知。前面4种I/O模型 阻塞i/o 非阻塞i/o i/o复用 信号驱动i/o 都是同步的。
java_BIO
简介
java的传统IO都属于BIO,也就是同步阻塞式I/O模型,在BIO的模式下,注定了一个线程对应一个Client,当线程知道数据未准备就绪的时线程就会进入阻塞式状态,这就是阻塞式IO。
编程实现
需求:使用多线程实现一个线程对应一个客户端请求
Server端实现
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(9999);
System.out.println("==服务端启动==");
while (true) {
Socket socket = ss.accept();
//服务端创建一个单独的线程来处理接受到的用户连接请求
new ServerThreadReader(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ServerThreadReader extends Thread {
private Socket socket;
public ServerThreadReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println("服务端接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client端实现
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 9999);
OutputStream os = socket.getOutputStream();
PrintStream ps = new PrintStream(os, true);
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("请输入消息:");
String msg = scanner.nextLine();
ps.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
编程结果:
客户端发出socket连接,服务端接收到后,也会创建一个socket并会创建一个线程来处理这个socket,这样客户端和服务端线程就可以通过这两个socket进行通信了。
BIO模式下的缺点
- 一个客户端对应一个线程,当用户请求连接后,什么都不做,也需要开启一个线程来服务一个客户,造成资源的浪费。
- 当客户多的时候,服务器会创建很多的线程进行服务,这样会使得无服务器压力太大,造成线程栈溢出,服务器宕机等问题
- 线程之间频繁的切换上下文影响了服务器的性能
线程池技术防止线程栈溢出
简介
服务端接收到的用户请求,把请求接收到的socket对象封装成Runnable对象,会先把Runnable放到任务队列中,如果线程池中有空闲线程则处理任务队列中的任务,没有空闲线程则任务队列中的任务则需要等待。
编程实现
线程池
public class HandleSocketThreadPool {
//线程池
ExecutorService executorService;
public HandleSocketThreadPool(int maxPoolSize, int queueSize) {
/*
corePoolSize - 即使空闲时仍保留在池中的线程数
maximumPoolSize - 池中允许的最大线程数
keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
unit - keepAliveTime参数的时间单位
workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务。
*/
this.executorService = new ThreadPoolExecutor(3,maxPoolSize,120, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable target){
executorService.execute(target);
}
}
Server
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(9999);
HandleSocketThreadPool threadPool = new HandleSocketThreadPool(3, 4);
int count = 1;
while (count < 100) {
Socket socket = ss.accept();
System.out.println("第 " + count + " 个客户端发起请求");
count++;
//使用线程池接收这个socket
/*
1. 先把socket包装成一个Runnable对象
2. 把这个runnable对象交给线程池管理
*/
ReadClientRunnable runnable = new ReadClientRunnable(socket);
threadPool.execute(runnable);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ReadClientRunnable implements Runnable{
private final Socket socket;
public ReadClientRunnable(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println("服务端接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
总结
- 使用线程池技术处理用户请求,即使用户请求过多也不会导致线程栈溢出,但是本质没变,同一时间还是一个线程处理一个请求。还是同步阻塞模型。
java_NIO
基础知识
java实现NIO主要靠三个类:Buffer, Channel, Selector。Buffer是存放数据的地方,channel是数据传输通道不存放数据,通过与buffer交互(从buffer中读取数据写入通道或者写入数据到buffer),selector是管理多个通道的。
Buffer
简介
NIO中关键的Buffer有 CharBuffer, ByteBuffer, ShortBuffer, IntBuffer, LongBuffer, FloatBuffer, DoubleBuffer 对应七种基本数据类型。还有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等。
通过它们自己的静态方法创建对象
static XxxBuffer allocate(int capacity) : 创建一个容量为capacity 的 XxxBuffer 对象
基本属性
容量 (capacity) :作为一个内存块,Buffer具有一定的固定大小,也称为"容量",缓冲区容量不能为负,并且创建后不能更改。
限制 (limit):表示缓冲区中可以操作数据的大小(>=limit 的位置数据不能进行读写)。缓冲区的限制不能为负,并且不能大于其容量。 写入模式,限制等于buffer的容量。读取模式下,limit等于写入的数据量。
位置 (position):下一个要读取或写入的数据的索引。缓冲区的位置不能为 负,并且不能大于其限制
标记 (mark)与重置 (reset):标记是一个索引,通过 Buffer 中的 mark() 方法 指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这 个 position.
**图解属性: **

常见方法
Buffer clear() 清空缓冲区并返回对缓冲区的引用
Buffer flip() 为 将缓冲区的界限设置为当前位置,并将当前位置充值为 0
int capacity() 返回 Buffer 的 capacity 大小
boolean hasRemaining() 判断缓冲区中是否还有元素
int limit() 返回 Buffer 的界限(limit) 的位置
Buffer limit(int n) 将设置缓冲区界限为 n, 并返回一个具有新 limit 的缓冲区对象
Buffer mark() 对缓冲区设置标记
int position() 返回缓冲区的当前位置 position
Buffer position(int n) 将设置缓冲区的当前位置为 n , 并返回修改后的 Buffer 对象
int remaining() 返回 position 和 limit 之间的元素个数
Buffer reset() 将位置 position 转到以前设置的 mark 所在的位置
Buffer rewind() 将位置设为为 0, 取消设置的 mark
clear和compact的区别:
clear会把position的位置设置为0,limit设置为capacity。但是原来的数据不会被清空,只是有新数据来的时候会被覆盖。
compact是会保留未被读取的数据,将未被读取的数据往前挪,position指向未被读取的最后一个数据的下一个位置。limit会被设置为capacity。
读写方法
Buffer 所有子类提供了两个用于数据操作的方法:get()put() 方法
取获取 Buffer中的数据
get() :读取单个字节
get(byte[] dst):批量读取多个字节到 dst 中
get(int index):读取指定索引位置的字节(不会移动 position)
放到 入数据到 Buffer 中 中
put(byte b):将给定单个字节写入缓冲区的当前位置
put(byte[] src):将 src 中的字节写入缓冲区的当前位置
put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
读写Buffer的时候遵循的规则
- 读之前先使用 buffer.flip() 方法切换成读模式
- 读取之后调用 clear() 或者 compact()
Channel
简介
Channel可以理解为数据传输的通道,Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的,譬如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel的主要实现有:
- FileChannel 通过字节流对象获取
- DatagramChannel
- SocketChannel
- ServerSocketChannel
使用nio从客户端传入数据给服务端的过程如下
常用方法
int read(ByteBuffer dst) 从 从 Channel 到 中读取数据到 ByteBuffer
long read(ByteBuffer[] dsts) 将 将 Channel 到 中的数据“分散”到 ByteBuffer[]
int write(ByteBuffer src) 将 将 ByteBuffer 到 中的数据写入到 Channel
long write(ByteBuffer[] srcs) 将 将 ByteBuffer[] 到 中的数据“聚集”到 Channel
long position() 返回此通道的文件位置
FileChannel position(long p) 设置此通道的文件位置
long size() 返回此通道的文件的当前大小
FileChannel truncate(long s) 将此通道的文件截取为给定大小
void force(boolean metaData) 强制将所有对此通道的文件更新写入到存储设备中
案例
使用Channel和Buffer进行文件的复制
public void copyFile() {
try {
FileInputStream fis = new FileInputStream("src//com//yogurt//nio//b_nio_channel//壁纸.jpg");
FileOutputStream fos = new FileOutputStream("src//com//yogurt//nio//b_nio_channel//壁纸new.jpg");
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while ((fisChannel.read(buffer)) != -1) {
//转换成读模式
buffer.flip();
fosChannel.write(buffer);
//这里一定要清空缓存区
buffer.clear();
}
fosChannel.close();
fisChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
selector
简介
selector是单个线程可以处理多个Channel的关键,他会把channel和该channel感兴趣的事件注入到selector中,调用select() 进入阻塞状态,直到有channel感兴趣的事件准备就绪就返回就绪事件的个数,然后线程进入就绪状态,等待处理。
方法
通过静态方法open()创建对象
Selector selector = Selector.open();
向选择器注册通道:SelectableChannel.register(Selector sel, int ops)
//1. 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 切换非阻塞模式
ssChannel.configureBlocking(false);
//3. 绑定连接
ssChannel.bind(new InetSocketAddress(9999));
//4. 获取选择器
Selector selector = Selector.open();
//5. 将通道注册到选择器上, 并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。可以监听的事件类型(用 可使用 SelectionKey 的四个常量 表示):
- 读 : SelectionKey.OP_READ (1)
- 写 : SelectionKey.OP_WRITE (4)
- 连接 : SelectionKey.OP_CONNECT (8)
- 接收 : SelectionKey.OP_ACCEPT (16)
- 若注册时不止监听一个事件,则可以使用“位或”操作符连接。
int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE
案例演示流程
服务端流程
public class Server {
public static void main(String[] args) throws IOException {
//1. 创建ServerSocketChannel 用来绑定端口和接收客户端的请求
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 配置成阻塞模式
ssChannel.configureBlocking(false);
//3. 绑定端口
ssChannel.bind(new InetSocketAddress(9999));
System.out.println("==服务端准备就绪==");
//4. 创建selector
Selector selector = Selector.open();
//5. 把ssChanel注册到selector中,定义事件类型为接受事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 轮询selector处理就绪事件
while (selector.select() > 0){
//获取到所有准备好的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
System.out.println("有新事件准备就绪了~~");
//轮询就绪事件
while (iterator.hasNext()) {
//获取就绪事件
SelectionKey selectionKey = iterator.next();
//判断事件类型是否是接收
if (selectionKey.isAcceptable()) {
//通过ssChannel获取客户端的连接
SocketChannel channel = ssChannel.accept();
//转换成非阻塞模式
channel.configureBlocking(false);
//把客户端的连接注册到selector中,注册成读操作
channel.register(selector,SelectionKey.OP_READ);
//判断是否是客户端发数据过来了
}else if (selectionKey.isReadable()){
//获取和客户端对应的channel
SocketChannel channel = (SocketChannel)selectionKey.channel();
//读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len;
//channel向缓冲区写入数据
while ((len = channel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
//清空事件,表示就绪事件都已经处理好了
iterator.remove();
}
}
}
}
客户端流程
public class Client {
public static void main(String[] args) throws IOException {
//创建SocketChannel,会被selector轮询到,然后被ssChanel.accept接收注册进selector
SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
channel.configureBlocking(false);
//创建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.nextLine();
buffer.put(("iandf say :" + msg).getBytes());
buffer.flip();
//从buffer中读取数据到客户端channel
channel.write(buffer);
buffer.clear();
}
}
}
组合案例~群聊
使用NIO实现群聊
public class Server {
private ServerSocketChannel ssChannel;
private Selector selector;
private final int PORT = 9999;
public Server() {
try {
//初始化
selector = Selector.open();
ssChannel = ServerSocketChannel.open();
//定义为非阻塞模式
ssChannel.configureBlocking(false);
//绑定端口号
ssChannel.bind(new InetSocketAddress(PORT));
//注册感兴趣的事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
/*
需求:
监听客户端的请求
接收客户端发过来的消息,转发给所有其他的客户端
*/
public static void main(String[] args) {
Server server = new Server();
server.listen();
}
private void listen() {
try {
System.out.println("监听线程: " + Thread.currentThread().getName());
//让selector进行轮询
while (selector.select() > 0) {
//拿到所有的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//如果事件是客户端有请求连接服务端
if (selectionKey.isAcceptable()) {
SocketChannel channel = ssChannel.accept();
channel.configureBlocking(false);
System.out.println(channel.getRemoteAddress() + " 上线 ");
channel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
System.out.println("收到客户端发送数据的事件");
handleRead(selectionKey);
}
}
//表示本次selector监听到的事件都处理完成了
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 通过监听到的事件拿到数据
* @param selectionKey 监听到的事件
*/
private void handleRead(SelectionKey selectionKey) {
//拿到对应的channel
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
//将channel中的数据读入buffer
int len = channel.read(buffer);
if (len > 0) {
buffer.flip();
String msg = new String(buffer.array(), 0, len);
System.out.println(msg);
//发送消息给所有的客户
sendMsgToAll(msg, channel);
buffer.clear();
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " 关闭了连接");
//取消这次事件
selectionKey.cancel();
//关闭通道
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
e.printStackTrace();
}
}
/**
* 把 msg 发送给其他的客户端
* @param msg 从当前通道里面获取的消息
* @param self 当前通道
*/
private void sendMsgToAll(String msg, SocketChannel self) throws IOException {
System.out.println("准备发送该消息给所有用户");
System.out.println(selector.keys().size());
for (SelectionKey selectedKey : selector.keys()) {
SelectableChannel channel = selectedKey.channel();
if (channel instanceof SocketChannel && channel != self) {
//从buffer中向通道里面写出数据
SocketChannel dest = (SocketChannel) channel;
System.out.println(dest.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
dest.write(buffer);
}
}
}
}
public class Client {
private Selector selector;
private SocketChannel channel;
private final String ADDRESS = "127.0.0.1";
private final int PORT = 9999;
public Client() {
try {
selector = Selector.open();
//绑定服务端的地址和端口,发送请求
channel = SocketChannel.open();
channel.connect(new InetSocketAddress(ADDRESS, PORT));
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Client client = new Client();
//开启一个线程,接收服务端发过来的消息
new Thread(client::readInfo).start();
//从控制台读取数据发送给服务端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//向buffer中写入数据
try {
System.out.println(ByteBuffer.wrap((Thread.currentThread().getName() + msg).getBytes()));
//buffer写出数据到channel
client.channel.write(ByteBuffer.wrap((client.channel.getLocalAddress().toString().substring(1) + " say: " + msg).getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void readInfo() {
try {
while (selector.select() > 0) {
System.out.println("有数据来了");
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
//申请缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
//将channel里面的数据读取到buffer
System.out.println(new String(buffer.array()));
}
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
NIO和BIO的思考
BIO的优点:代码简单,清晰,易于维护,适用于连接数目较小的场景。
BIO的缺点:一个线程对应一个连接,当数据未就绪时就会进入线程阻塞状态,直到数据拷贝到线程,线程才会进入就绪状态。这样即使有的客户连接进入服务器什么也不做就等于大大浪费了服务器的资源,且没办法适应于大规模的并发场景。随着客户数量的增多会发生线程栈的溢出,进而可能导致服务器宕机等。
NIO的优点:使用I/O复用模型,使一个线程就能同时处理多个连接,不会导致一个连接的数据未就绪就导致线程阻塞,内核会轮询每一个连接,直到有感兴趣的事件发生,内核会通知线程,线程进行系统调用进入阻塞状态,知道数据从内核拷贝到线程,线程就进入就绪状态。适用于多连接且I/O操作不耗时的场景
NIO的缺点:一个线程处理多个连接,当一个连接的I/O操作耗时很长的情况下,其他的连接可能迟迟都无法得到响应。而且NIO的代码不易懂,难以维护和升级,可以使用已经封装好的框架,比如Netty。