NIO 包及工作原理
针对传统I/O 工作模式的不足,NIO 工具包提出了基于Buffer(缓冲区)、Channel(通
道)、Selector(选择器)的新模式;Selector(选择器)、可选择的Channel(通道)和
SelectionKey(选择键)配合起来使用,可以实现并发的非阻塞型I/O 能力。
NIO 工具包的成员
Buffer(缓冲器)
Buffer 类是一个抽象类,它有7 个子类分别对应于七种基本的数据类型:ByteBuffer、
CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer 和ShortBuffer。每一个Buffer
对象相当于一个数据容器,可以把它看作内存中的一个大的数组,用来存储和提取所有基本
类型(boolean 型除外)的数据。Buffer 类的核心是一块内存区,可以直接对其执行与内存有关
的操作,利用操作系统特性和能力提高和改善Java 传统I/O 的性能。
Channel(通道)
Channel 被认为是NIO 工具包的一大创新点,是(Buffer)缓冲器和I/O 服务之间的通道,
具有双向性,既可以读入也可以写出,可以更高效的传递数据。我们这里主要讨论
ServerSocketChannel 和SocketChannel,它们都继承了SelectableChannel,是可选择的通道,
分别可以工作在同步和异步两种方式下(这里的可选择不是指可以选择两种工作方式,而是
指可以有选择的注册自己感兴趣的事件)。当通道工作在同步方式时,它的功能和编程方法
与传统的ServerSocket、Socket 对象相似;当通道工作在异步工作方式时,进行输入输出处
理不必等到输入输出完毕才返回,并且可以将其感兴趣的(如:接受操作、连接操作、读出
操作、写入操作)事件注册到Selector 对象上,与Selector 对象协同工作可以更有效率的支
持和管理并发的网络套接字连接。
Selector(选择器)和SelectionKey(选择键)
各类 Buffer 是数据的容器对象;各类Channel 实现在各类Buffer 与各类I/O 服务间传输
数据。Selector 是实现并发型非阻塞I/O 的核心,各种可选择的通道将其感兴趣的事件注册
到Selector 对象上,Selector 在一个循环中不断轮循监视这各些注册在其上的Socket 通道。
SelectionKey 类则封装了SelectableChannel 对象在Selector 中的注册信息。当Selector 监测
到在某个注册的SelectableChannel 上发生了感兴趣的事件时,自动激活产生一个SelectionKey
对象,在这个对象中记录了哪一个SelectableChannel 上发生了哪种事件,通过对被激活的
SelectionKey 的分析,外界可以知道每个SelectableChannel 发生的具体事件类型,进行相应的
处理。
NIO 工作原理
通过上面的讨论,我们可以看出在并发型服务器程序中使用NIO,实际上是通过网络事
件驱动模型实现的。我们应用Select 机制,不用为每一个客户端连接新启线程处理,而是将
其注册到特定的Selector 对象上,这就可以在单线程中利用Selector 对象管理大量并发的网
络连接,更好的利用了系统资源;采用非阻塞I/O 的通信方式,不要求阻塞等待I/O 操作完
成即可返回,从而减少了管理I/O 连接导致的系统开销,大幅度提高了系统性能。
当有读或写等任何注册的事件发生时,可以从Selector 中获得相应的
SelectionKey , 从SelectionKey 中可以找到发生的事件和该事件所发生的具体的
SelectableChannel,以获得客户端发送过来的数据。由于在非阻塞网络I/O 中采用了事件触
发机制,处理程序可以得到系统的主动通知,从而可以实现底层网络I/O 无阻塞、流畅地读
写,而不像在原来的阻塞模式下处理程序需要不断循环等待。使用NIO,可以编写出性能更
好、更易扩展的并发型服务器程序。
服务器端程序:
public class HelloWorldServer {
static int BLOCK = 1024;
static String name = "";
protected Selector selector;
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
protected CharsetDecoder decoder;
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
public HelloWorldServer(int port) throws IOException {
selector = this.getSelector(port);
Charset charset = Charset.forName("GB2312");
decoder = charset.newDecoder();
}
// 获取Selector
protected Selector getSelector(int port) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
Selector sel = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
// 监听端口
public void listen() {
try {
for (;;) {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
process(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 处理事件
protected void process(SelectionKey key) throws IOException {
if (key.isAcceptable()) { // 接收请求
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
//设置非阻塞模式
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 读信息
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(clientBuffer);
if (count > 0) {
clientBuffer.flip();
CharBuffer charBuffer = decoder.decode(clientBuffer);
name = charBuffer.toString();
// System.out.println(name);
SelectionKey sKey = channel.register(selector,
SelectionKey.OP_WRITE);
sKey.attach(name);
} else {
channel.close();
}
clientBuffer.clear();
} else if (key.isWritable()) { // 写事件
SocketChannel channel = (SocketChannel) key.channel();
String name = (String) key.attachment();
ByteBuffer block = encoder.encode(CharBuffer
.wrap("Hello !" + name));
channel.write(block);
//channel.close();
}
}
public static void main(String[] args) {
int port = 8888;
try {
HelloWorldServer server = new HelloWorldServer(port);
System.out.println("listening on " + port);
server.listen();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端程序:
public class HelloWorldClient {
static int SIZE = 10;
static InetSocketAddress ip = new InetSocketAddress("localhost", 8888);
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
static class Message implements Runnable {
protected String name;
String msg = "";
public Message(String index) {
this.name = index;
}
public void run() {
try {
long start = System.currentTimeMillis();
//打开Socket通道
SocketChannel client = SocketChannel.open();
//设置为非阻塞模式
client.configureBlocking(false);
//打开选择器
Selector selector = Selector.open();
//注册连接服务端socket动作
client.register(selector, SelectionKey.OP_CONNECT);
//连接
client.connect(ip);
//分配内存
ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
int total = 0;
_FOR: for (;;) {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key
.channel();
if (channel.isConnectionPending())
channel.finishConnect();
channel
.write(encoder
.encode(CharBuffer.wrap(name)));
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key
.channel();
int count = channel.read(buffer);
if (count > 0) {
total += count;
buffer.flip();
while (buffer.remaining() > 0) {
byte b = buffer.get();
msg += (char) b;
}
buffer.clear();
} else {
client.close();
break _FOR;
}
}
}
}
double last = (System.currentTimeMillis() - start) * 1.0 / 1000;
System.out.println(msg + "used time :" + last + "s.");
msg = "";
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
String names[] = new String[SIZE];
for (int index = 0; index < SIZE; index++) {
names[index] = "jeff[" + index + "]";
new Thread(new Message(names[index])).start();
}
}
}