Channel 是 NIO 的核心概念,它表示一个打开的连接,这个连接可以连接到 I/O 设备(例如:磁盘文件,Socket)或者一个支持 I/O 访问的应用程序。Java NIO 使用缓冲区和通道来进行数据传输。
一个通道在创建的时候被打开,可以调用 isOpen() 来判断一个通道是否是打开状态。关闭通道使用 close() 方法,一个通道一旦被关闭,将不能被重新打开。
1. 基于缓冲区 Buffer 和通道 Channel 的数据交互
应用程序可以通过与 I/O 设备建立通道来实现对 I/O 设备的读写操作,操作的数据通过缓冲区 Buffer 来进行交互。
从 I/O 设备读取数据时:
1)应用程序调用通道 Channel 的 read() 方法;
2)通道往缓冲区 Buffer 中填入 I/O 设备中的数据,填充完成之后返回;
3)应用程序从缓冲区 Buffer 中获取数据。
往 I/O 设备写数据时:
1)应用程序往缓冲区 Buffer 中填入要写到 I/O 设备中的数据;
2)调用通道 Channel 的 write() 方法,通道将数据传输至 I/O 设备。
2. NIO 中主要的通道类型与操作
这里仅讨论磁盘文件和网络套接字的 I/O 通道,在整个 NIO 的学习中,直接内存映射相关内容一般指的是磁盘文件 I/O,而 I/O 多路复用模型和选择器则一般指网络I/O。磁盘文件通道为 FileChannel,网络套接字通道有 TCP 相关的 SocketChannel,ServerSocketChannel 和 UDP 相关的 DatagramChannel。
2.1 FileChannel
文件通道可以连接一个文件,然后对文件进行读,写,映射到直接内存。使用文件通道操作文件的一般流程为:
1)获取通道。文件通道通过 FileChannel 的静态方法 open() 来获取,获取时需要指定文件路径和文件打开方式。
FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);// 获取文件通道
2)创建字节缓冲区。文件相关的字节缓冲区有两种,一种是基于堆的 HeapByteBuffer,另一种是基于文件映射,放在堆外内存中的 MappedByteBuffer。这里使用前者,后者相关内容可以参考:Java NIO 文件通道 FileChannel 用法。
ByteBuffer buf = ByteBuffer.allocate(10); // 分配字节缓存
3)读写操作。
读取数据。一般需要一个循环结构来读取数据,读取数据时需要注意切换 ByteBuffer 的读写模式。
while (channel.read(buf) != -1){ // 读取通道中的数据,并写入到 buf 中
buf.flip(); // 缓存区切换到读模式
while (buf.position() < buf.limit()){ // 读取 buf 中的数据
text.append((char)buf.get());
}
buf.clear(); // 清空 buffer,缓存区切换到写模式
}
写入数据。
for (int i = 0; i < text.length(); i++) {
buf.put((byte)text.charAt(i)); // 填充缓冲区,需要将 2 字节的 char 强转为 1 自己的 byte
if (buf.position() == buf.limit() || i == text.length() - 1) { // 缓存区已满或者已经遍历到最后一个字符
buf.flip(); // 将缓冲区由写模式置为读模式
channel.write(buf); // 将缓冲区的数据写到通道
buf.clear(); // 清空缓存区,将缓冲区置为写模式,下次才能使用
}
}
4)将数据刷出到物理磁盘。FileChannel 的 force(boolean metaData) 方法可以确保对文件的操作能够更新到磁盘。metaData 为 true 表示不仅要刷出数据,还要刷入文件的元数据,如:修改时间。
channel.force(false);
5)关闭通道。
channel.close();
下面给出一个文件通道的具体示例。示例中 writeText() 将字符串写入到文件当中,然后 readText() 再将内容读出来。这里为了简单起见,示例代码中字符串只能包含 ASCII 字符,而不能包含中文字或其它特殊字符;否则会乱码。
public class FileChannelReadWrite {
public static void main(String[] args) throws IOException {
String fileName = "data.txt";
String text = "Hello, welcome to Robothy's blog.";
writeText(fileName, text);
System.out.println(readText(fileName));
}
static String readText(String fileName) throws IOException {
FileChannel channel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);// 获取文件通道
ByteBuffer buf = ByteBuffer.allocate(10); // 分配字节缓存
StringBuilder text = new StringBuilder();
while (channel.read(buf) != -1){ // 读取通道中的数据,并写入到 buf 中
buf.flip(); // 缓存区切换到读模式
while (buf.position() < buf.limit()){ // 读取 buf 中的数据
text.append((char)buf.get());
}
buf.clear(); // 清空 buffer,缓存区切换到写模式
}
channel.close(); // 关闭通道
return text.toString();
}
static void writeText(String fileName, String text) throws IOException {
// 获取文件通道
FileChannel channel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
ByteBuffer buf = ByteBuffer.allocate(10); // 创建字节缓冲区
for (int i = 0; i < text.length(); i++) {
buf.put((byte)text.charAt(i)); // 填充缓冲区,需要将 2 字节的 char 强转为 1 自己的 byte
if (buf.position() == buf.limit() || i == text.length() - 1) { // 缓存区已满或者已经遍历到最后一个字符
buf.flip(); // 将缓冲区由写模式置为读模式
channel.write(buf); // 将缓冲区的数据写到通道
buf.clear(); // 清空缓存区,将缓冲区置为写模式,下次才能使用
}
}
channel.force(false); // 将数据刷出到磁盘,不刷出文件元数据
channel.close(); // 关闭通道
}
}
关于 FileChannel 的更多详细用法:Java NIO 文件通道 FileChannel 用法。
2.2 SocketChannel
SocketChannel 负责 TCP 套接字的连接和数据传输,客户端和服务端都需要用到。SocketChannel 是线程安全的,支持多线程访问。
SocketChannel 有阻塞连接和非阻塞连接两种模式。对于阻塞连接,读取数据时会阻塞,直到有数据过来或者连接被关闭;对于非阻塞连接,调用 read() 方法时无论是否有数据都会立即返回。可以调用 configureBlocking(boolean block) 来配置为阻塞通道或非阻塞通道。
SocketChannel 可以由服务端或者客户端发起关闭。假设客户端在写数据时,服务端关闭了连接,客户端 write() 方法会抛出 AsynchronousCloseException;假设客户端在读取数据时,服务端关闭了连接,read() 方法会立即返回 -1
,此时缓冲区中没有内容。
TCP 客户端使用 SocketChannel 与服务端进行交互的流程为:
1)打开通道,连接到服务端。
SocketChannel channel = SocketChannel.open(); // 打开通道,此时还没有打开 TCP 连接
channel.connect(new InetSocketAddress("localhost", 9090)); // 连接到服务端
这两句也可以合并起来写。
SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
2)分配缓冲区。
ByteBuffer buf = ByteBuffer.allocate(10); // 分配一个 10 字节的缓冲区,不实用,容量太小
3)配置是否为阻塞方式。(默认为阻塞方式)
channel.configureBlocking(false); // 配置通道为非阻塞模式
如果配置了非阻塞模式,还需要调用 SocketChannel.finishConnect() 方法确保连接已经完成。
while (!channel.finishConnect()){// 不断检查是否完成了连接
Thread.sleep(10);
}
4)与服务端进行数据交互。
5)关闭连接。
在关闭连接时,如果客户端是写数据的一方,完成写入之后应该先调用一下 SocketChannel.shutdownOutput() ,此时读的一端能够检测到 read() 返回的 -1。然后调用 clser() 方法关闭通道。
channel.shutdownOutput(); // 关闭 TCP 输出,此时客户端会发送 -1 给服务端
channel.close(); // 关闭通道
服务端在客户端由连接过来时会创建一个 SocketChannel,不需要手动创建,后续步骤和客户端一样。下面有完整的示例。
2.3 ServerSocketChannel
ServerSocketChannel 负责监听连接,服务端使用,在监听到 TCP 连接时会产生一个 SocketChannel 实例与客户端进行连接和数据交互。一般为了支持并发,服务端在产生 SocketChannel 之后可以通道实例放到一个队列中,用一个线程池去处理队列中的通道。不过这种方式并不能支持高并发,要支持高并发应该使用基于多路复用 I/O 模型的选择器。
1)打开一个 ServerSocketChannel 通道, 绑定端口。
ServerSocketChannel server = ServerSocketChannel.open(); // 打开通道
2)绑定端口
server.bind(new InetSocketAddress(9090)); // 绑定端口
3)阻塞等待连接到来。有新连接时会创建一个 SocketChannel 通道,服务端可以通过这个通道与连接过来的客户端进行通信。等待连接到来的代码一般放在一个循环结构中。
SocketChannel client = server.accept(); // 阻塞,直到有连接过来
4)通过 SocketChannel 与客户端进行数据交互
5)关闭 SocketChannel
client.close();
2.4 基于套接字通道的 TCP 通信完整示例
用户在客户端控制台数据要发送的内容,服务端接收内容并打印在控制台。客户端输入 "Bye" 之后,断开与服务端的连接。
TCP 客户端代码:
public class SocketChannelWrite {
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel channel = SocketChannel.open(); // 打开通道,此时还没有打开 TCP 连接
channel.connect(new InetSocketAddress("localhost", 9090)); // 连接到服务端
ByteBuffer buf = ByteBuffer.allocate(10); // 分配一个 10 字节的缓冲区,不实用,容量太小
Scanner scanner = new Scanner(System.in); // 扫描控制台输入
scanner.useDelimiter("
");
while(true){
String msg = scanner.next() + "
"; // 读取控制台输入的消息,再拼接上换行符
for(int i=0; i<msg.length(); i++){ // 逐个字符遍历输入的内容
buf.put((byte)msg.charAt(i)); // 将字符逐个放入缓冲区
if(buf.position() == buf.limit() || i == msg.length()-1){ // 缓冲区已满或者
buf.flip(); // 缓冲区切换到读模式
channel.write(buf); // 往通道写入数据
buf.clear(); // 清空缓冲区,缓冲区切换到写入模式
}
}
if("Bye
".equals(msg)){
channel.shutdownOutput(); // 关闭 TCP 输出,此时客户端会发送 -1 给服务端
channel.close(); // 关闭通道
break;
}
}
}
}
TCP 服务端代码:
public class ServerSocketChannelRead {
public static void main(String[] args) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open(); // 打开通道
server.bind(new InetSocketAddress(9090)); // 绑定端口
ByteBuffer buff = ByteBuffer.allocate(10); // 为了代码演示,只分配容量为 10 字节的缓冲区
while (true) {
SocketChannel client = server.accept(); // 阻塞,直到有连接过来
System.out.println("Client connected.");
while (true) { // 循环读取客户端发送过来的数据
if(client.read(buff) == -1){ // 客户端关闭了输出之后,阻塞的 client.read(buf) 会立即返回 -1,此时 buf 中没有内容
client.close(); // 关闭通道
System.out.println("Client closed the connection.");
break;
}
buff.flip(); // 切换到读模式
while (buff.position() < buff.limit()) {
System.out.print((char) buff.get()); // 一个字符一个字符打印出来
}
buff.clear(); // 切换到写模式
}
}
}
}
2.5 DatagramChannel
数据报通道 DatagramChannel 表示 UDP 通道。UDP 是无连接协议,在收发数据时不需要进行连接。与 FileChannel 和 SocketChannel 使用 read()/write() 不同,DatagramChannel 通常使用 receive()/send() 来收发数据。receive() 在接收数据之后会返回发送方的地址,send() 方法在发送数据的时候需要指定接收方的地址。
DatagramChannel 支持阻塞模式和非阻塞模式。非阻塞模式时,receive(ByteBuffer dst) 方法会立即返回,如果有数据,则会返回发送方的地址;如果没有数据,则返回 null。类似地,非阻塞模式下 send(ByteBuffer src, SocketAddress) 也会立即返回,返回的结果为发送的字节数。
DatagramChannel 作为客户端操作流程:
1)打开通道
DatagramChannel channel = DatagramChannel.open();
2)配置阻塞模式
channel.configureBlocking(false); // 非阻塞模式
3)分配缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配 1024 字节的缓冲区
4)数据交互
数据报通道 DatagramChannel 通过 receive()/send() 方法来进行数据的交互。需要注意的是,发送数据时,每次最多发送一个 UDP 数据报的大小(理论上是 65535-8 字节);因此,当缓冲区过大时,需要考虑多次发送。发送数据的时候需要指定地址。
另外,DatagramChannel 指定了 connect(SocketAddress remote) 方法,传入通信对方的地址。如果调用了此方法,则该通道只能和指定的地址进行数据交互,即使 send() 指定了其它的地址也没有。事实上,DatagramChannel 提供了 read()/write() 方法,这两个方法只有在 connect 指定了地址的情况下才能够使用,否则数据将被丢弃。
SocketAddress address = channel.receive(buf);
channel.send(buf, address);
5)关闭通道
channel.close();
DatagramChannel 作为服务端操作流程:
1)打开通道
与客户端打开通道的方式一样。
2)绑定要监听的端口
channel.bind(new InetSocketAddress(9090)); // 绑定要监听的端口
3)配置阻塞模式
4)分配缓冲区
5)接收客户端发送过来的数据
下面提供基于 DatagramChannel 进行 UDP 通信的完整示例代码。
2.6 基于 DatagrapChannel 的 UDP 通讯实例
服务端接收客户端发送过来的数据报,然后打印其内容,再向客户端发送一条消息,表示接收到的消息的大小。
public class DatagramChannelRead {
public static void main(String[] args) throws IOException {
DatagramChannel channel = DatagramChannel.open(); // 打开通道
channel.bind(new InetSocketAddress(9090)); // 绑定要监听的端口
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配缓冲区
while (true){
SocketAddress address = channel.receive(buf); // 接收数据,获取发送方地址
buf.flip(); // 缓冲区切换为读模式
int len = buf.limit(); // 获取 buff 中数据的长度
System.out.println("Client -> " + new String(buf.array(), 0, len, StandardCharsets.UTF_8)); // 打印 buf 中的内容
buf.clear(); // 清空缓冲区,切换到写模式
buf.put(String.format("Received %4d bytes.", len).getBytes()); // 将要返回给发送端的消息填入缓冲区
buf.flip();
channel.send(buf, address); // send 一次性最多只能发送 65535 - 8 字节的数据,如果 buf 很大的话需要用一个循环去发送。
buf.clear();
}
客户端有2个线程, sender 线程接收用户在控制台输入的内容,接收一行输入的内容就发送给服务端;receiver 线程接收服务端返回的消息并打印在控制台。当用户输入 "Bye" 时,客户端退出。
public class DatagramChannelWrite {
public static void main(String[] args) throws IOException, InterruptedException {
DatagramChannel channel = DatagramChannel.open(); // 打开通道
InetSocketAddress serverAddress = new InetSocketAddress("localhost", 9090); // 声明服务端的地址
channel.configureBlocking(false); // 非阻塞模式
// 用于接收服务端发送过来的消息
Thread receiver = new Thread(()->{
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配 1024 字节的缓冲区
while(!Thread.currentThread().isInterrupted()){ // 检查中断标志,如果被中断,则结束线程
try {
while (null == channel.receive(buf)) { // 循环接收数据
Thread.sleep(10); // 没有消息则 sleep 10ms
}
buf.flip();
System.out.println("Server -> " + new String(buf.array(), 0, buf.limit()));
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread sender = new Thread(()->{
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true){
String msg = scanner.nextLine();
if(msg.equals("Bye")) {
receiver.interrupt();
break;
}
buf.put(msg.getBytes(StandardCharsets.UTF_8));
buf.flip();
channel.send(buf, serverAddress);
buf.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
});
sender.start(); // 启动 sender 线程
receiver.start(); // 启动 receiver线程
receiver.join(); // 等待 receiver
channel.close(); // 关闭通道
}
}
3. 小结
1)Java NIO 中的通道结合缓冲区,提供了一种与流不一样的操作模式。通道是应用程序到 I/O 设备的一个打开的连接,应用程序可以往通道中写入数据或者从通道中读取数据。
2)NIO 中主要的通道有四种,磁盘文件 I/O 相关的 FileChannel,网络 I/O 相关的 SocketChannel, ServerSocketChannel 和 DatagramChannel。其中文件相关的通道只能以阻塞的方式进行 I/O 操作,而网络相关通道则可以通过阻塞方式和非阻塞方式进行通信。
以上是关于通道的一些基本概念和用法,就这些内容上来看,NIO 相对于普通的 I/O 并没有太大的优势(非阻塞网络 I/O除外);普通 I/O 流中的 BufferedInputStream, BufferedOutputSteram 能够起到和通道几乎一样的作用。事实上,基于内存映射技术的直接内存缓存提供了比普通 I/O 更加高效的访问磁盘文件方式;而 NIO 为网络 I/O 提供了非阻塞访问模型的接口,配合选择器 Selector,极大提高了 Java 程序所能够支持的并发数。