Java NIO
节选自:NIO入门
NIO
新的输入/输出 (NIO) 库是在 JDK 1.4 中引入的。
NIO 弥补了原来的 I/O 的不足,它在标准 Java 代码中提供了快速的、面向块的 I/O。
通过定义包括数据的类。以及通过以块的形式处理这些数据,NIO 不用使用本机代码就能够利用低级优化,这是原来的 I/O 包所无法做到的。
通道和缓冲区
概述
通道 和 缓冲区 是 NIO 中的核心对象。差点儿在每个 I/O 操作中都要使用它们。
通道是对原 I/O 包中的流的模拟。到不论什么目的地(或来自不论什么地方)的全部数据都必须通过一个 Channel 对象。一个 Buffer 实质上是一个容器对象。发送给一个通道的全部对象都必须首先放到缓冲区中;相同地。从通道中读取的不论什么数据都要读到缓冲区中。
缓冲区
Buffer 是一个对象。 它包括一些要写入或者刚读出的数据。
在 NIO 中增加 Buffer 对象,体现了新库与原 I/O 的一个重要差别。
在面向流的 I/O 中,您将数据直接写入或者将数据直接读到 Stream 对象中。
在 NIO 库中,全部数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。
在写入数据时,它是写入到缓冲区中的。不论什么时候訪问 NIO 中的数据,您都是将它放到缓冲区中。
缓冲区实质上是一个数组。通常它是一个字节数组,可是也能够使用其它种类的数组。可是一个缓冲区不 仅仅 是一个数组。缓冲区提供了对数据的结构化訪问。并且还能够跟踪系统的读/写进程。
缓冲区类型
最经常使用的缓冲区类型是 ByteBuffer。
一个 ByteBuffer 能够在其底层字节数组上进行 get/set 操作(即字节的获取和设置)。
ByteBuffer 不是 NIO 中唯一的缓冲区类型。其实,对于每一种基本 Java 类型都有一种缓冲区类型:
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
每个 Buffer 类都是 Buffer 接口的一个实例。 除了 ByteBuffer,每个 Buffer 类都有全然一样的操作,仅仅是它们所处理的数据类型不一样。由于大多数标准 I/O 操作都使用 ByteBuffer,所以它具有全部共享的缓冲区操作以及一些特有的操作。
如今您能够花一点时间运行 UseFloatBuffer.java,它包括了类型化的缓冲区的一个应用样例。
通道
Channel是一个对象,能够通过它读取和写入数据。
拿 NIO 与原来的 I/O 做个比較,通道就像是流。
正如前面提到的。全部数据都通过 Buffer 对象来处理。您永远不会将字节直接写入通道中,相反,您是将数据写入包括一个或者多个字节的缓冲区。相同,您不会直接从通道中读取字节。而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
通道类型
通道与流的不同之处在于通道是双向的。而流仅仅是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类), 而 通道 能够用于读、写或者同一时候用于读写。
由于它们是双向的,所以通道能够比流更好地反映底层操作系统的真实情况。特别是在 UNIX 模型中,底层操作系统通道是双向的。
实例
拷贝文件
import java.util.*;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
/*
* 利用NIO进行文件的拷贝
* 步骤:
* 1. 创建文件通道 Channel
* 2. 创建 buffer
* 3. 读-写
*/
public class CopyFileNIO
{
public static void main(String[] args) throws Exception
{
System.out.println("Hello World!");
long start = System.currentTimeMillis();
copyBIO();
long end = System.currentTimeMillis();
System.out.println(end - start);
}
public static void copyNIO() throws Exception {
FileInputStream in = new FileInputStream("readme.txt");
FileOutputStream out = new FileOutputStream("copyNIO.txt");
//获取通道对象
FileChannel fcin = in.getChannel();
FileChannel fcout = out.getChannel();
//创建Buffer
ByteBuffer bbf = ByteBuffer.allocate(1024);
while(fcin.read(bbf) != -1){
//clear() 方法重设缓冲区,使它能够接受读入的数据。
//flip() 方法让缓冲区能够将新读入的数据写入还有一个通道。
bbf.flip();
fcout.write(bbf);
bbf.clear();
}
fcin.close();
fcout.close();
in.close();
out.close();
}
public static void copyBIO() throws Exception {
FileInputStream in = new FileInputStream("readme.txt");
FileOutputStream out = new FileOutputStream("copyBIO.txt");
int b = 0;
while((b = in.read()) != -1){
out.write(b);
}
in.close();
out.close();
}
}
通过測试:NIO–9 BIO—80,可知NIO效率明显高于BIO
缓冲区内部细节
概述
本节将介绍 NIO 中两个重要的缓冲区组件:状态变量和訪问方法 (accessor)。
状态变量是前一节中提到的”内部统计机制”的关键。每个读/写操作都会改变缓冲区的状态。通过记录和跟踪这些变化,缓冲区就能够内部地管理自己的资源。
在从通道读取数据时。数据被放入到缓冲区。在有些情况下,能够将这个缓冲区直接写入还有一个通道,可是在普通情况下,您还须要查看数据。这是使用 訪问方法 get() 来完毕的。相同,假设要将原始数据放入缓冲区中,就要使用訪问方法 put()。
在本节中,您将学习关于 NIO 中的状态变量和訪问方法的内容。我们将描写叙述每个组件,并让您有机会看到它的实际应用。尽管 NIO 的内部统计机制初看起来可能非常复杂。可是您非常快就会看到大部分的实际工作都已经替您完毕了。
您可能习惯于通过手工编码进行簿记 ― 即使用字节数组和索引变量,如今它已在 NIO 中内部地处理了。
状态变量
能够用三个值指定缓冲区在随意时刻的状态:
- position
- limit
- capacity
这三个变量一起能够跟踪缓冲区的状态和它所包括的数据。
我们将在以下的小节中具体分析每个变量。还要介绍它们怎样适应典型的读/写(输入/输出)进程。在这个样例中,我们假定要将数据从一个输入通道复制到一个输出通道。
- Position
- 从通道读取数据时,position记录已经向缓冲区写入了多少数据
- 向通道写入数据时,position记录已经从缓冲区读取了多少数据
- Limit
- 从缓冲区写入通道时,limit记录了缓冲区有多少数据能够写入通道
- 从通道读入缓冲区时,还有多少空间能够放入数据
position 总是小于或者等于 limit。
- Capacity
缓冲区的 capacity 表明能够储存在缓冲区中的最大数据容量。实际上,它指定了底层数组的大小, 或者至少是指定了准许我们使用的底层数组的容量。
limit 决不能大于 capacity。
方法
flip
当从通道读取数据到缓冲区后,须要将缓冲区的数据写入还有一个通道。则写入之前,须要调用 flip() 方法。这种方法做两件非常重要的事:
- 将 limit 设置为当前 position。
- 将 position 设置为 0。
将 buffer 又一次设置为写缓冲区
clear
最后一步是调用缓冲区的 clear() 方法。这种方法重设缓冲区以便接收很多其它的字节。
Clear 做两种非常重要的事情:
- 将 limit 设置为与 capacity 相同。
- 设置 position 为 0。
将 buffer 又一次设置为读缓冲区
get
ByteBuffer 类中有四个 get() 方法:
- byte get();
- ByteBuffer get( byte dst[] );
- ByteBuffer get( byte dst[], int offset, int length );
- byte get( int index );
put
ByteBuffer 类中有五个 put() 方法:
- ByteBuffer put( byte b );
- ByteBuffer put( byte src[] );
- ByteBuffer put( byte src[], int offset, int length );
- ByteBuffer put( ByteBuffer src );
- ByteBuffer put( int index, byte b );
连网和异步 I/O
概述
连网是学习异步 I/O 的非常好基础,而异步 I/O 对于在 Java 语言中运行不论什么输入/输出过程的人来说,无疑都是必须具备的知识。NIO 中的连网与 NIO 中的其它不论什么操作没有什么不同 ― 它依赖通道和缓冲区,而您通常使用 InputStream 和 OutputStream 来获得通道。
本节首先介绍异步 I/O 的基础 ― 它是什么以及它不是什么。然后转向更有用的、程序性的样例。
异步 I/O
异步 I/O 是一种 没有堵塞地 读写数据的方法。通常,在代码进行 read() 调用时,代码会堵塞直至有可供读取的数据。相同, write() 调用将会堵塞直至数据能够写入。
还有一方面,异步 I/O 调用不会堵塞。
相反,您将注冊对特定 I/O 事件的兴趣 ― 可读的数据的到达、新的套接字连接。等等。而在发生这种事件时,系统将会告诉您。
异步 I/O 的一个优势在于,它同意您同一时候依据大量的输入和输出运行 I/O。同步程序经常要求助于轮询,或者创建许很多多的线程以处理大量的连接。
使用异步 I/O,您能够监听不论什么数量的通道上的事件。不用轮询,也不用额外的线程。
我们将通过研究一个名为 MultiPortEcho.java 的样例程序来查看异步 I/O 的实际应用。
这个程序就像传统的 echo server(回显server),它接受网络连接并向它们回响它们可能发送的数据。只是它有一个附加的特性,就是它能同一时候监听多个端口,并处理来自全部这些端口的连接。并且它仅仅在单个线程中完毕全部这些工作。
Selectors
异步 I/O 中的核心对象名为 Selector。Selector 就是您注冊对各种 I/O 事件感兴趣的地方,并且当那些事件发生时,就是这个对象告诉您所发生的事件。
它维护了三个选择键集:
- 键集
包括的键表示当前通道到此选择器的注冊。此集合由 keys 方法返回。
- 已选择键集
是这样一种键的集合,即在前一次选择操作期间。检測每个键的通道是否已经至少为该键的相关操作集所标识的一个操作准备就绪。此集合由 selectedKeys 方法返回。已选择键集始终是键集的一个子集。
- 已取消键集
是已被取消但其通道尚未注销的键的集合。不可直接訪问此集合。已取消键集始终是键集的一个子集。
所以,我们须要做的第一件事就是创建一个 Selector:
Selector selector = Selector.open();
然后,我们将对不同的通道对象调用 register() 方法,以便注冊我们对这些对象中发生的 I/O 事件的兴趣。register() 的第一个參数总是这个 Selector。
打开一个 ServerSocketChannel
为了接收连接。我们须要一个 ServerSocketChannel。其实,我们要监听的每个端口都须要有一个 ServerSocketChannel 。对于每个端口,我们打开一个ServerSocketChannel。例如以下所看到的:
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );
//获取与此通道关联的server套接字。
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[i] );
ss.bind( address );
第一行创建一个新的 ServerSocketChannel 。最后三行将它绑定到给定的端口。
第二行将 ServerSocketChannel 设置为 非堵塞的 。我们必须对每个要使用的套接字通道调用这种方法。否则异步 I/O 就不能工作。
选择键
下一步是将新打开的 ServerSocketChannels 注冊到 Selector上。
为此我们使用 ServerSocketChannel.register() 方法。例如以下所看到的:
SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
register() 的第一个參数总是这个 Selector。
第二个參数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。
请注意对 register() 的调用的返回值。
SelectionKey 代表这个通道在此 Selector 上的这个注冊。
当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还能够用于取消通道的注冊。
内部循环
如今已经注冊了我们对一些 I/O 事件的兴趣。以下将进入主循环。使用 Selectors 的差点儿每个程序都像以下这样使用内部循环:
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
// ... deal with I/O event ...
}
首先,我们调用 Selector 的 select() 方法。这种方法会堵塞,直到至少有一个已注冊的事件发生。
当一个或者很多其它的事件发生时, select() 方法将返回所发生的事件的数量。
接下来。我们调用 Selector 的 selectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的一个 集合 。
我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每个 SelectionKey。您必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。
监听新连接
程序运行到这里,我们仅注冊了 ServerSocketChannel,并且仅注冊它们“接收”事件。
为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:
if ((key.readyOps() & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT) {
// Accept the new connection
// ...
}
能够肯定地说, readOps() 方法告诉我们该事件是新的连接。
接受新的连接
由于我们知道这个server套接字上有一个传入连接在等待,所以能够安全地接受它;也就是说。不用操心 accept() 操作会堵塞:
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
下一步是将新连接的 SocketChannel 配置为非堵塞的。并且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注冊到 Selector上,例如以下所看到的:
sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
注意我们使用 register() 的 OP_READ 參数,将 SocketChannel 注冊用于 读取 而不是 接受 新连接。
删除处理过的 SelectionKey
在处理 SelectionKey 之后。我们差点儿能够返回主循环了。可是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。假设我们没有删除处理过的键。那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey:
it.remove();
如今我们能够返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。
传入的 I/O
当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用 Selector.select(),并返回一个或者多个 I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,例如以下所看到的:
} else if ((key.readyOps() & SelectionKey.OP_READ)
== SelectionKey.OP_READ) {
// Read the data
SocketChannel sc = (SocketChannel)key.channel();
// ...
}
与曾经一样,我们取得发生 I/O 事件的通道并处理它。在本例中,由于这是一个 echo server。我们仅仅希望从套接字中读取数据并立即将它发送回去。
回到主循环
每次返回主循环。我们都要调用 select 的 Selector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件。从选定的键集中删除 SelectionKey,然后返回主循环的顶部。
这个程序有点过于简单。由于它的目的仅仅是展示异步 I/O 所涉及的技术。在现实的应用程序中。您须要通过将通道从 Selector 中删除来处理关闭的通道。
并且您可能要使用多个线程。
这个程序能够仅使用一个线程,由于它仅仅是一个演示,可是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。
JAVA NIO库
NIO相关类定义在了java.nio和java.nio.channels这两个包里
以下摘自API文档
selector类
方法 | 意义 |
---|---|
abstract void close() | 关闭此选择器。 |
abstract boolean isOpen() | 告知此选择器是否已打开。 |
abstract Set<SelectionKey> keys() | 返回此选择器的键集。 |
static Selector open() | 打开一个选择器。 |
abstract SelectorProvider provider() | 返回创建此通道的提供者。 |
abstract int select() | 选择一组键,其对应的通道已为 I/O 操作准备就绪。 |
abstract int select(long timeout) | 选择一组键。其对应的通道已为 I/O 操作准备就绪。 |
abstract Set<SelectionKey> selectedKeys() | 返回此选择器的已选择键集。 |
abstract int selectNow() | 选择一组键,其对应的通道已为 I/O 操作准备就绪。 |
abstract Selector wakeup() | 使尚未返回的第一个选择操作立即返回。 |
凝视:
1.open()创建一个selector,然后这个selector维护键集,已选择键集。已取消键集
2.selector(),selector(timeout),selector()删除键集中已取消键集中的键,关闭通道,将准备就绪的键增加已选择键集
ServerSocketChannel
针对面向流的侦听套接字的可选择通道。
方法 | 意义 |
---|---|
abstract SocketChannel accept() | 接受到此通道套接字的连接。 |
static ServerSocketChannel open() | 打开server套接字通道。 |
abstract ServerSocket socket() | 获取与此通道关联的server套接字。 |
int validOps() | 返回一个操作集,标识此通道所支持的操作。 |
凝视:
1.open()打开一个server套接字通道,打开之后须要将server套接字绑定,通道和套接字并不一样
2.accept(),
3.socket,获取server套接字,之后调用bind()绑定port
SelectionKey
此类定义了全部已知的操作集位 (operation-set bit),可是给定的通道具体支持哪些位则取决于该通道的类型。SelectableChannel 的每个子类都定义了 validOps() 方法,该方法返回的集合恰好标识该通道支持的操作。
试图设置或測试某个键的通道所不支持的操作集位将导致抛出对应的运行时异常。
字段 | 意义 |
---|---|
static int OP_ACCEPT | 用于套接字接受操作的操作集位。 |
static int OP_CONNECT | 用于套接字连接操作的操作集位。 |
static int OP_READ | 用于读取操作的操作集位。 |
static int OP_WRITE | 用于写入操作的操作集位。 |
方法 | 意义 |
---|---|
abstract void cancel() | 请求取消此键的通道到其选择器的注冊。 |
abstract SelectableChannel channel() | 返回创建此键的通道。 |
boolean isAcceptable() | 測试此键的通道是否已准备好接受新的套接字连接。 |
boolean isConnectable() | 測试此键的通道是否已完毕其套接字连接操作。 |
boolean isReadable() | 測试此键的通道是否已准备好进行读取。 |
boolean isWritable() | 測试此键的通道是否已准备好进行写入。 |
SocketChannel
针对面向流的连接套接字的可选择通道。
方法 | 意义 |
---|---|
abstract boolean connect(SocketAddress remote) | 连接此通道的套接字。 |
abstract boolean finishConnect() | 完毕套接字通道的连接过程。 |
abstract boolean isConnected() | 推断是否已连接此通道的网络套接字。 |
abstract boolean isConnectionPending() | 推断此通道上是否正在进行连接操作。 |
static SocketChannel open() | 打开套接字通道。 |
static SocketChannel open(SocketAddress remote) | 打开套接字通道并将其连接到远程地址。 |
abstract int read(ByteBuffer dst) | 将字节序列从此通道中读入给定的缓冲区。 |
long read(ByteBuffer[] dsts) | 将字节序列从此通道读入给定的缓冲区。 |
abstract long read(ByteBuffer[] dsts, int offset, int length) | 将字节序列从此通道读入给定缓冲区的子序列中。 |
abstract Socket socket() | 获取与此通道关联的套接字。 |
int validOps() | 返回一个操作集。标识此通道所支持的操作。 |
abstract int write(ByteBuffer src) | 将字节序列从给定的缓冲区中写入此通道。 |
long write(ByteBuffer[] srcs) | 将字节序列从给定的缓冲区写入此通道。 |
abstract long write(ByteBuffer[] srcs, int offset, int length) | 将字节序列从给定缓冲区的子序列写入此通道。 |
凝视:
显然这个非常重要的,由于数据的传输和接收都是通过SocketChannel。
实例
server端
public class Server
{
public static void main(String[] args)
{
System.out.println("Hello World!");
try{
//打开选择器
Selector selector = Selector.open();
//打开server套接字通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置为非堵塞模式
ssc.configureBlocking( false );
//绑定套接字
ssc.socket().bind(new InetSocketAddress(8888));
//将server套接字通道注冊到selector,注冊事件是OP_ACCEPT,即连接请求
ssc.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("server启动!"
);
while(true){
//查询是否有IO事件准备就绪。没有就堵塞
selector.select();
//获得准备就绪的SelectionKey
Set<SelectionKey> keys = selector.selectedKeys();
//迭代器訪问
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()){
//获取准备就绪的SelectionKey
SelectionKey key = it.next();
//从已选择键集中删除,防止下次反复操作
it.remove();
//推断Key的准备就绪操作
if(key.isAcceptable()){
//准备好连接操作
SocketChannel sc = ((ServerSocketChannel)key.channel()).accept();
//将新的连接套接字注冊到selector。感兴趣的事件是有数据可读
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
sc.write(ByteBuffer.wrap(
new String("你好。client").getBytes()
));
}else if(key.isReadable()){
//准备好读操作
//获取该键上的Channel
SocketChannel sc = (SocketChannel)key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(100);
sc.read(buffer);
//获取缓冲区顶层的数组
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("服务端收到信息:"+msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
sc.write(outBuffer);// 将消息回送给client
}
}
}
}catch(ClosedChannelException cce){
cce.printStackTrace();
}catch(IOException ioe){
ioe.printStackTrace();
}
}
}
client
public class Client
{
public static void main(String[] args) throws Exception
{
System.out.println("client启动。");
//创建一个selector用来管理通道
Selector selector = Selector.open();
//创建套接字通道
SocketChannel sc = SocketChannel.open();
//设置为非堵塞方式
sc.configureBlocking(false);
//向selector注冊,感兴趣的事件是连接成功
sc.register(selector,SelectionKey.OP_CONNECT);
//发送请求连接
sc.connect(new InetSocketAddress("localhost",8888));
while(true){
//查询是否有IO事件准备就绪,没有的话堵塞
selector.select();
//感兴趣的事件出现,获取已选择键集
Set<SelectionKey> keys = selector.selectedKeys();
//遍历已选择键集,推断准备就绪的事件
Iterator<SelectionKey> it = keys.iterator();
while(it.hasNext()){
SelectionKey key = it.next();
it.remove();
if(key.isConnectable()){
//通道是否已完毕其套接字连接操作
sc = (SocketChannel)key.channel();
//这里获取到的SocketChannel就是上面创建的Channel
//System.out.println(sc.equals(sc2));
//是否正在连接
if(sc.isConnectionPending()){
sc.finishConnect();
}
//发送信息
sc.write(ByteBuffer.wrap(new String("你好,server!").getBytes()));
//在和服务端连接成功之后,为了能够接收到服务端的信息,
//须要给通道设置读的权限。
sc.register(selector, SelectionKey.OP_READ);
System.out.println(selector.keys().size());
}else if(key.isReadable()){
//准备好读操作
//获取该键上的Channel
sc = (SocketChannel)key.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(100);
sc.read(buffer);
//获取缓冲区顶层的数组
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("client向server端发送的消息是:"+msg);
}
}
}
}
}