目录
NIO三大核心:Buffer、Channel、Selector
- 每个Channel都会对应一个Buffer
- Selector对应一个线程,多个Channel会注册到一个Selector上
- 程序切换到哪个Channel是由事件决定的,Selector会根据不同的事件,在各个Channel上切换
- Buffer就是一个内存块,底层是数组
- NIO数据的读取写入是通过Buffer,是双向的。而BIO的数据读写依靠的是输入输出流,是单向的(要么输入要么输出)
- Channel是双向的,可以更好地抽象底层操作系统的情况(底层的操作系统通道就是双向的)
Buffer
缓冲区其实就是数组的封装
七种基本数据类型中,除了boolean外,系统都有对应类型的Buffer:
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
上述缓冲区的管理方式几乎是一直的,创建都是通过一个allocate方法获取的
缓冲区存取数据的两个核心方法
- put() 存入数据到缓冲区中
- get() 获取缓存冲区中的数据
缓冲区中的四个核心属性
- capacity 容量,表示缓冲区中最大存储数据的容量,一旦声明不能改变。
- limit 界限,表示缓冲区中可以操作数据的大小。(limit后面的数据不能进行读写)
- position 位置,表示缓冲区中正在操作的位置
- mark 标记,表示记录当前position的位置,可以通过reset()恢复到mark的位置
position <= limit <= capacity
直接缓冲区与非直接缓冲区
- 非直接缓冲器:通过allocate()方法分配,将缓冲区建立在JVM的内存中,属于堆内存
- 直接缓冲区:通过allocateDirect()方法分配,将缓冲区建立在物理内存中,可以提高效率,属于堆外内存,只有ByteBuffer有这个东西
代码示例
package day01;
import java.nio.ByteBuffer;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestBuffer {
@Test
public void test_1() {
// 1. 分配一个指定大小的缓冲区
log.info("1. 分配一个指定大小的缓冲区");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
log.info("capacity: {}", byteBuffer.capacity());
log.info("limit: {}", byteBuffer.limit());
log.info("position: {}", byteBuffer.position());
// 2. 使用put()存入数据到缓冲区
log.info("2. 使用put()存入数据到缓冲区");
byteBuffer.put("Hello Java".getBytes());
log.info("capacity: {}", byteBuffer.capacity());
log.info("limit: {}", byteBuffer.limit()); // limit不变
log.info("position: {}", byteBuffer.position()); // position增加10
// 3. 切换为读取数据模式
log.info("3. 切换为读取数据模式");
byteBuffer.flip();
log.info("capacity: {}", byteBuffer.capacity());
log.info("limit: {}", byteBuffer.limit()); // limit变为position的值
log.info("position: {}", byteBuffer.position()); // position归0
// 4. 使用get()读取缓冲区的数据
log.info("4. 使用get()读取缓冲区的数据");
byte[] dst = new byte[byteBuffer.limit()];
byteBuffer.get(dst, 3, 5);
log.info(new String(dst));
log.info("capacity: {}", byteBuffer.capacity());
log.info("limit: {}", byteBuffer.limit()); // limit不变
log.info("position: {}", byteBuffer.position()); // position增加到读取的位置
// 5. rewind()重复读数据
log.info("5. rewind()重复读数据");
byteBuffer.rewind();
log.info("capacity: {}", byteBuffer.capacity());
log.info("limit: {}", byteBuffer.limit()); // limit不变
log.info("position: {}", byteBuffer.position()); // position回到0
// 6. clear()清空缓冲区,但是缓冲区中的数据还在,会被重新添加的数据覆盖
log.info("6. clear()清空缓冲区");
byteBuffer.clear();
log.info("capacity: {}", byteBuffer.capacity());
log.info("limit: {}", byteBuffer.limit()); // limit回到capacity
log.info("position: {}", byteBuffer.position()); // position回到0
}
@Test
public void test_2() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("123456789".getBytes());
byteBuffer.flip();
byte[] dst = new byte[byteBuffer.limit()];
byteBuffer.get(dst, 0, 2);
log.info(new String(dst));
byteBuffer.mark();
dst = new byte[byteBuffer.limit()];
byteBuffer.get(dst, 2, 2);
log.info(new String(dst));
byteBuffer.reset();
dst = new byte[byteBuffer.limit()];
byteBuffer.get(dst, 2, 2);
log.info(new String(dst));
if (byteBuffer.hasRemaining()) {
log.info(String.valueOf(byteBuffer.remaining()));
}
}
@Test
public void test_3() {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
log.info(String.valueOf(byteBuffer.isDirect()));
}
}
Channel
通道(Channel):用于源节点与目标节点的连接,在Java NIO中负责缓冲区中数据的传输。Channel本身不存储数据,因此需要配合缓冲区进行操作
通道的主要实现类
java.nio.channels.Channel接口
|- FileChannel
|- SocketChannel
|- ServerSocketChannel
|- DatagramChannel 用于UDP
获取通道
-
Java针对支持通道的类提供了getChannel()方法
本地IO:
FileInputStream/FileOutputStream
RandomAccessFile/
网络IO:
Socket
ServerSocket
DatagramSocket -
JDK1.7之后,针对各个通道,都提供了静态方法open()
-
JDK1.7之后,Files工具类的newByteChannel()方法
通道之间的数据传输
transferFrom()
transferTo()
分散(Scatter)与聚集(Gather)
分散读取(Scatting Reads):将通道中的数据分散到多个缓冲区中
聚集写入(Gathering Writes):将多个缓冲区的数据聚集到通道中
字符集:Charset
编码 与 解码
代码示例
package day01;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.Map.Entry;
import org.junit.Test;
public class TestChannel {
/**
* 利用通道完成文件的复制(非直接缓冲区)
* @throws Exception
*/
@Test
public void test_1() throws Exception {
FileInputStream fis = new FileInputStream("timg.jpg");
FileOutputStream fos = new FileOutputStream("timg_copy.jpg");
// 1. 获取通道
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();
// 2. 分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3. 将通道中的数据写入缓冲区
while (inChannel.read(buf) != -1) {
// 4. 将缓冲区中的数据写入通道
buf.flip(); // 切换为读取数据的模式
outChannel.write(buf);
buf.clear();
}
inChannel.close();
outChannel.close();
fis.close();
fos.close();
}
/**
* 使用直接缓冲区完成文件的复制(内存映射文件)
* @throws Exception
*/
@Test
public void test_2() throws Exception {
FileChannel inChannel = FileChannel.open(Paths.get("timg.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("timg_copy.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 存在就创建,不存在就报错
// 这就是一个内存映射文件,属于直接缓冲区
MappedByteBuffer inBuffer = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outBuffer = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
// 直接对缓冲区进行读写操作
byte[] dst = new byte[inBuffer.limit()];
inBuffer.get(dst);
outBuffer.put(dst);
inChannel.close();
outChannel.close();
}
/**
* 通道之间的数据传输
* 属于直接缓冲区
* @throws Exception
*/
@Test
public void test_3() throws Exception {
FileChannel inChannel = FileChannel.open(Paths.get("timg.jpg"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("timg_copy.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 存在就创建,不存在就报错
inChannel.transferTo(0, inChannel.size(), outChannel);
inChannel.close();
outChannel.close();
}
/**
*
* @throws Exception
*/
@Test
public void test_4() throws Exception {
RandomAccessFile raf = new RandomAccessFile("pom.xml", "rw");
RandomAccessFile waf = new RandomAccessFile("pom_copy.xml", "rw");
// 1. 获取通道
FileChannel rfc = raf.getChannel();
FileChannel wfc = waf.getChannel();
// 2. 分配指定大小缓冲区
ByteBuffer bf1 = ByteBuffer.allocate(100);
ByteBuffer bf2 = ByteBuffer.allocate(100);
// 3. 分散读取
ByteBuffer[] bfs = {bf1, bf2};
while ((rfc.read(bfs) != -1)) {
// 设置为读模式
for (int i = 0; i < bfs.length; i++) {
bfs[i].flip();
}
// 将buffer数据写入通道
wfc.write(bfs);
// 清空buffer
for (int i = 0; i < bfs.length; i++) {
bfs[i].clear();
}
}
rfc.close();
wfc.close();
raf.close();
waf.close();
}
@Test
public void test_5() throws Exception {
Map<String, Charset> charsetMap = Charset.availableCharsets();
for (Entry<String, Charset> entry : charsetMap.entrySet()) {
System.out.println("key: " + entry.getKey() + ", value: " + entry.getValue());
}
Charset cs1 = Charset.forName("GBK");
// 获取编码器
CharsetEncoder encoder = cs1.newEncoder();
// 获取解码器
CharsetDecoder decoder = cs1.newDecoder();
CharBuffer cb = CharBuffer.allocate(100);
cb.put("你好");
cb.flip();
// 编码操作
ByteBuffer bb = encoder.encode(cb);
// 解码操作
bb.flip(); // 所有的读操作之前都要进行模式切换
CharBuffer cb2 = decoder.decode(bb);
System.out.println(cb2.toString());
bb.flip(); // 所有的读操作之前都要进行模式切换
Charset cs2 = Charset.forName("UTF-8");
System.out.println(cs2.decode(bb).toString());
}
}
使用NIO完成网络通信
阻塞
package day02;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import org.junit.Test;
/**
* 一、使用NIO完成网络通信的三个核心:
*
* 1. 通道(Channel):负责连接
*
* java.io.channels.Channel
* |-SelectableChannel
* |-SocketChannel
* |-ServerSocketChannel
* |-DatagramChannel
*
* |-Pipe.SinkChannel
* |-Pipe.SourceChannel
*
* 2. 缓冲区(Buffer):负责数据的存取
*
* 3. 选择器(Selector):是SelectableChannel的多路复用器。用于监控SelectableChannel的IO状况
*
*
*/
public class TestBlockNIO {
@Test
public void testClient() throws Exception {
// 获取客户端通道
SocketChannel clientChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8899));
// 测试客户端发送文件
FileChannel fc = FileChannel.open(Paths.get("timg.jpg"), StandardOpenOption.READ);
// 创建一个字节缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
// 循环读取数据到缓冲区
while (fc.read(byteBuffer) != -1) {
// 切换缓冲区为读模式
byteBuffer.flip();
// 将数据写入到客户端通道
clientChannel.write(byteBuffer);
// 清空缓冲区
byteBuffer.clear();
}
// 告诉服务端发送完毕,阻塞模式下必须这样才能继续运行
clientChannel.shutdownOutput();
// 接收服务端反馈
int len = 0;
while ((len = clientChannel.read(byteBuffer)) != -1) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, len));
byteBuffer.clear();
}
// 关闭通道
fc.close();
clientChannel.close();
}
@Test
public void TestServer() throws Exception {
// 获取服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 测试文件接收
FileChannel fc = FileChannel.open(Paths.get("timg_net.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
// 绑定服务端端口号
serverSocketChannel.bind(new InetSocketAddress(8899));
// 监听客户端通道
SocketChannel socketChannel = serverSocketChannel.accept();
// 创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
// 循环读取数据
while (socketChannel.read(byteBuffer) != -1) {
// 切换缓冲区为读模式
byteBuffer.flip();
// 将数据写入到文件通道
fc.write(byteBuffer);
// 清空缓冲区
byteBuffer.clear();
}
// 发送反馈给客户端
byteBuffer.put("接收完毕".getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteBuffer.clear();
// 关闭通道
fc.close();
socketChannel.close();
serverSocketChannel.close();
}
}
非阻塞
package day02;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import org.junit.Test;
/**
* 一、使用NIO完成网络通信的三个核心:
*
* 1. 通道(Channel):负责连接
*
* java.io.channels.Channel
* |-SelectableChannel
* |-SocketChannel
* |-ServerSocketChannel
* |-DatagramChannel
*
* |-Pipe.SinkChannel
* |-Pipe.SourceChannel
*
* 2. 缓冲区(Buffer):负责数据的存取
*
* 3. 选择器(Selector):是SelectableChannel的多路复用器。用于监控SelectableChannel的IO状况
*
*
*/
public class TestNoneBlockNIO {
@Test
public void client() throws Exception {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8899));
// 切换为非阻塞模式
socketChannel.configureBlocking(false);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(scanner.next().getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
}
scanner.close();
socketChannel.close();
}
@Test
public void server() throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 切换为非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8899));
// 获取选择器
Selector selector = Selector.open();
// 将通道户注册到选择器上,并指定监听某个事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮询式的获取选择器上已经准备就绪的事件
while (selector.select() > 0) {
// 获取当前选择器中所有注册的 已就绪监听事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
// 获取准备准备就绪的事件
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 如果是接收事件就绪
if (selectionKey.isAcceptable()) {
// 获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 切换为非阻塞模式
socketChannel.configureBlocking(false);
// 将通道注册到选择器上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 获取当前接收器上的读就绪状态
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();
System.out.println(":::" + new String(buffer.array(), 0, len));
buffer.clear();
}
}
}
}
}
}
管道
是单向的,这边发,另一边收
package day02;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.util.Scanner;
import org.junit.Test;
public class TestPipe {
private Pipe pipe;
{
try {
pipe = Pipe.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void test() throws Exception {
test_1();
test_2();
Thread.currentThread().join();
}
public void test_1() {
new Thread(() -> {
try {
Pipe.SinkChannel sinkChannel = pipe.sink();
Scanner scanner = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (scanner.hasNext()) {
buffer.put(scanner.next().getBytes());
buffer.flip();
sinkChannel.write(buffer);
buffer.clear();
}
scanner.close();
sinkChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
public void test_2() {
new Thread(() -> {
try {
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// != -1 或者 > 0 的写法都可以
while((sourceChannel.read(buffer)) != -1) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
sourceChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
UDPNIO
package day02;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Scanner;
import org.junit.Test;
public class TestUDPNIO {
@Test
public void client() throws Exception {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
Scanner scanner = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (scanner.hasNext()) {
// 不考虑buffer长度不够的情况
buffer.put(scanner.next().getBytes());
buffer.flip();
datagramChannel.send(buffer, new InetSocketAddress("127.0.0.1", 8899));
buffer.clear();
}
scanner.close();
datagramChannel.close();
}
@Test
public void server() throws Exception {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
datagramChannel.bind(new InetSocketAddress(8899));
Selector selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isReadable()) {
// 不考虑buffer长度不够的情况
datagramChannel.receive(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
}
}
}