简介
Java NIO(New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。
不同的IO模型比较如下图所示。
从图中可以看出,阻塞的IO模型在资源没有被满足的时候,读取线程是呈阻塞态的,而非阻塞的IO模型就像轻量级锁那样,通过反复询问的方式来获取资源。
架构
NIO主要有三大核心部分:
- Channel(通道)
- Buffer(缓冲区)
- Selector(选择器)
传统的IO是基本字节流和字符流进行操作的,而NIO是基于Channel和Buffer进行操作,数据是经过通道进入缓存区,或者从缓存区写入到通道中。选择器用于监听多个通道的时间,由单线程维护,也就是起着轮询的作用。
Channel
基本上,所有的IO都在NIO中的一个Channel开始,Channel有点像Stream,数据可以从Buffer写到Channel,也可以从Channel写到Buffer中。
- 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
- 通道可以异步地读写。
- 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
Channel可以根据具体的使用场景,分为很多类型:
- FileChannel
从文件中读写数据。 - DatagramChannel
能通过UDP读写网络中的数据。 - SocketChannel
能通过TCP读写网络中的数据。 - ServerSocketChannel
可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
这些通道行涵盖了网络IO以及文件IO。其中FileChannel不能切换到非阻塞模式,而套接字通道都可以。
Buffer
Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,或者从缓冲区写入到通道中的。缓冲区本质上就是一个可以读写的内存区域。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。
根据缓存区数组的数据类型,Buffer分为以下的类型:
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
其实很像JDK中Stream中的缓冲数组。
Selector
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。因此,使用的线程越少越好。
基本使用
Channel和Buffer
通常Channel和Buffer需要配合使用。
Buffer的基本使用流程:
- 创建Buffer对象并分配缓冲区,调用对应类型Buffer的allocate()方法即可;
- 写入数据到Buffer,有两种方式,通过Channel.read()方法或者直接调用buffer的put()方法;
- 调用flip()方法将Buffer从写模式切换到读模式;
- 从Buffer中读取数据,也有两种方式,通过Channel.write()或者调用buffer的get()方法;
- 调用clear()方法或者compact()方法清空缓冲区,从而可以再次写入。
一个简单的文件读取例子。
// 读取文件
RandomAccessFile file = new RandomAccessFile("data/nio_data.txt", "rw");
// 构建文件通道
FileChannel inChannel = file.getChannel();
// 构建缓冲区,大小未16
ByteBuffer buf = ByteBuffer.allocate(16);
// 从通道中写一次数据到缓存区
int len = inChannel.read(buf);
// 一直将文件读完
while(len != -1) {
System.out.println(len);
// 将读指针指到缓存头部,从而切换到读模式
buf.flip();
// 依次读取缓存中的数据
while(buf.hasRemaining()) {
System.out.print((char) buf.get());
}
// 清空缓存区
buf.clear();
// 重新将通道中剩余数据读到缓冲区
len = inChannel.read(buf);
}
// 关闭文件
file.close();
Selector
基本流程如下:
- 通过调用Selector.open()方法创建一个Selector;
- 通过SelectableChannel.register()将Channel注册到selector上;
- 通过Selector.select()选择就绪的通道,如果全部事件都没有就绪,就会阻塞;
- 通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道;
- 遍历键集合来访问就绪的通道,key.channel()方法就能获取当前的可用的通道。
一个精简的示例(不可运行):
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}
简单的Reactor模式:注册所有感兴趣的事件处理器,单线程轮询选择就绪事件,执行事件处理器(不可运行):
interface ChannelHandler{
void channelReadable(Channel channel);
void channelWritable(Channel channel);
}
class Channel{
Socket socket;
Event event;//读,写或者连接
}
//IO线程主循环:
class IoThread extends Thread{
public void run(){
Channel channel;
//选择就绪的事件和对应的连接
while(channel=Selector.select()){
if(channel.event==accept){
//如果是新连接,则注册一个新的读写处理器
registerNewChannelHandler(channel);
}
if(channel.event==write){
//如果可以写,则执行写事件
getChannelHandler(channel).channelWritable(channel);
}
if(channel.event==read){
//如果可以读,则执行读事件
getChannelHandler(channel).channelReadable(channel);
}
}
}
//所有channel的对应事件处理器
Map<Channel,ChannelHandler> handlerMap;
}
建立多个socket服务器t对多个端口的监听(可运行):
// 表示五个监听端口
int ports[] = {8000, 8001, 8002, 8003, 8005, 8006} ;
// 创建一个selector
Selector selector = Selector.open() ;
for(int i = 0;i < ports.length; i++){
ServerSocketChannel initSer = null;
// 打开服务器的通道
initSer = ServerSocketChannel.open();
// 服务器配置为非阻塞
initSer.configureBlocking(false);
// 创建一个socket服务器
ServerSocket initSock = initSer.socket();
InetSocketAddress address = null;
address = new InetSocketAddress(ports[i]);
// 实例化绑定地址
initSock.bind(address);
// 将channel注册到选择器上
initSer.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器运行,在" + ports[i] + "端口监听。");
}
// 要接收全部生成的key,并通过连接进行判断是否获取客户端的输出
while(selector.select() > 0){
// 选择一组键,并且相应的通道已经准备就绪 selector.select()在此阻塞等待请求到来
Set<SelectionKey> selectedKeys = selector.selectedKeys();
// 取出全部生成的key
Iterator<SelectionKey> iter = selectedKeys.iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
// 取出每一个key
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel)key.channel();
// 接收新连接
SocketChannel client = server.accept();
// 配置为非阻塞
client.configureBlocking(false);
// 初始化接收缓存
ByteBuffer buf = ByteBuffer.allocate(16);
// 读取通道中的所有数据
int len = client.read(buf);
while(len != -1) {
System.out.println(len);
// 将读指针指到缓存头部,从而切换到读模式
buf.flip();
// 依次读取缓存中的数据
while(buf.hasRemaining()) {
System.out.print((char) buf.get());
}
// 清空缓存区
buf.clear();
// 重新将通道中剩余数据读到缓冲区
len = client.read(buf);
}
client.close();
}
}
selectedKeys.clear();
}
应用场景
服务器需要支持超大量的长时间连接。比如1000个连接以上,并且每个客户端并不会频繁地发送太多数据。例如总公司的一个中心服务器需要收集全国便利店各个收银机的交易信息,只需要少量线程按需处理维护的大量长期连接。
在活动连接数不是特别高(小于单机1000)的情况下,阻塞I/O模型是比较不错的,可以让每一个连接专注于自己的I/O并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。
Jetty、Mina、Netty、ZooKeeper等都是基于NIO方式实现。
源码分析
JDK中NIO的组件很多,这里就根据上面的基本使用进行举例分析。
Buffer
当我们调用 ByteBuffer buf = ByteBuffer.allocate(16);
的时候就能获得一个ByteBuffer,而allocate()函数中实际返回的是HeapByteBuffer对象。
ByteBuffer是一个抽象类,继承自Buffer抽象类,而HeapByteBuffer就是继承自ByteBuffer。
Buffer
Buffer具有几个核心属性:
- mark,用于标记当前读取的位置,用于暂时保存当前的位置,读到后面的时候还能回到前面来读,这和IO中的Stream中是一样的。
- capacity,表示当前缓冲区中数组的容量。
- position,在读取模式的时候,表示当前可读取的位置,在写入模式的时候,表示当前可写入的位置。
- limit,在读取模式的时候,表示当前最大可读范围,也就是有效数据的范围,在写入模式的时候,就是等于capacity的。
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
Buffer构造方法如下,是default,说明只是包内可用,子类也不可用:
// 顾名思义,初始化各个指针的位置
Buffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0)
throw new IllegalArgumentException("Negative capacity: " + cap);
this.capacity = cap;
limit(lim);
position(pos);
if (mark >= 0) {
if (mark > pos)
throw new IllegalArgumentException("mark > position: ("
+ mark + " > " + pos + ")");
this.mark = mark;
}
}
在我们想要读取buffer的时候,会调用filp()函数,将写模式转化为读模式它的源码如下:
public final Buffer flip() {
// 将读的限制位置设置为当前写的位置
limit = position;
// 初始化读指针
position = 0;
// 初始化标记指针
mark = -1;
return this;
}
ByteBuffer
ByteBuffer是Buffer的一个子类,是字节缓冲区。ByteBuffer在Buffer之上定义的方法主要如下:
- 在内部数组的当前位置和指定位置的方式读取和写入字节。
- 通过get(byte[])的方法将ByteBuffer中的数据读取到内部数组中。
- 通过put(byte[])的方法将连续大量的数据写入数组。
- 在当前位置和指定位置的方式将其他类型的数据写入缓冲区或从缓冲区读取数据转换成特定类型。
- 提供将ByteBuffer转换成其他类型的Buffer的方法,例如变为CharBuffer。
- 提供compact、duplicate、slice来执行一些对ByteBuffer的操作。
字段如下:
// 字节缓冲区数组
final byte[] hb; // Non-null only for heap buffers
// 偏移量
final int offset;
boolean isReadOnly; // Valid only for heap buffers
构造方法如下:
// 传入缓冲区
ByteBuffer(int mark, int pos, int lim, int cap, // package-private
byte[] hb, int offset)
{
// 调用父类的构造器
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
// 没有缓冲区
ByteBuffer(int mark, int pos, int lim, int cap) { // package-private
this(mark, pos, lim, cap, null, 0);
}
构造缓冲区方法:
// 将缓冲区去分配到直接内存当中
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
// 将缓冲区放到堆上
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
// 包装一个已有的数组作为缓冲区数组,而且这个数组中已近存在数据了
public static ByteBuffer wrap(byte[] array,
int offset, int length)
{
try {
return new HeapByteBuffer(array, offset, length);
} catch (IllegalArgumentException x) {
throw new IndexOutOfBoundsException();
}
}
// 包装一个空的数组
public static ByteBuffer wrap(byte[] array) {
return wrap(array, 0, array.length);
}
HeapByteBuffer
HeapByteBuffer顾名思义就是JVM堆上的字节缓冲区,他用于缓存数据的byte数组就是直接在堆内申请的。
构造方法如下,区别在于不同的缓冲数组的来源,默认的构造方法直接就是new一个byte数组作为数据存储的缓冲区。
HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
/*
hb = new byte[cap];
offset = 0;
*/
}
HeapByteBuffer(byte[] buf, int off, int len) { // package-private
super(-1, off, off + len, buf.length, buf, 0);
/*
hb = buf;
offset = 0;
*/
}
protected HeapByteBuffer(byte[] buf,
int mark, int pos, int lim, int cap,
int off)
{
super(mark, pos, lim, cap, buf, off);
/*
hb = buf;
offset = off;
*/
}
HeapByteBuffer的方法就是实现了缓冲区基本的读写功能。
DirectByteBuffer
使用了直接内存的字节缓冲区。
字段:
// 从直接内存获取的缓冲区
protected static final Unsafe unsafe = Bits.unsafe();
// 缓冲数组的偏移
private static final long arrayBaseOffset = (long)unsafe.arrayBaseOffset(byte[].class);
// Cached unaligned-access capability
protected static final boolean unaligned = Bits.unaligned();
构造方法有很多,看一个默认设置大小的:
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
// 内存是否按页分配对齐
boolean pa = VM.isDirectMemoryPageAligned();
// 获取每页内存大小
int ps = Bits.pageSize();
// 分配内存的大小,如果是按页对齐方式,需要再加一页内存的容量
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
// 用Bits类保存总分配内存(按页分配)的大小和实际内存的大小
Bits.reserveMemory(size, cap);
long base = 0;
try {
// 在直接内存中分配缓冲的大小并获得基本地址
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
// 初始化堆外内存的数据为0
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
//计算堆外内存的基本地址
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
// 释放内存会通过cleaner类操作
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
内部类:
// 用于内存清理的内部类
private static class Deallocator
implements Runnable
{
// 获取直接内存
private static Unsafe unsafe = Unsafe.getUnsafe();
private long address;
private long size;
private int capacity;
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}
// 开始清理内存
public void run() {
if (address == 0) {
// Paranoia
return;
}
unsafe.freeMemory(address);
address = 0;
Bits.unreserveMemory(size, capacity);
}
}
基本的读写方法:
// 从直接内存中获取当前读指针的数据,并增加读指针
public byte get() {
return ((unsafe.getByte(ix(nextGetIndex()))));
}
// 获取指定位置的数据
public byte get(int i) {
return ((unsafe.getByte(ix(checkIndex(i)))));
}
// 在当前写指针的位置写入数据,并增加写指针
public ByteBuffer put(byte x) {
unsafe.putByte(ix(nextPutIndex()), ((x)));
return this;
}
// 在指定位置写入数据
public ByteBuffer put(int i, byte x) {
unsafe.putByte(ix(checkIndex(i)), ((x)));
return this;
}
Channel
以ServerSocketChannel为例。
ServerSocketChannel
主要就是提供了通道获取和绑定套接字的API。
public abstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel
{
// 构造函数
protected ServerSocketChannel(SelectorProvider provider) {
super(provider);
}
// 打开获取当前的通道
public static ServerSocketChannel open() throws IOException {
// 单例模式下通过provider获取通道
return SelectorProvider.provider().openServerSocketChannel();
}
// 获取选择器的感兴趣事件编码
public final int validOps() {
return SelectionKey.OP_ACCEPT;
}
// 绑定套接字
public final ServerSocketChannel bind(SocketAddress local)
throws IOException
{
return bind(local, 0);
}
public abstract ServerSocketChannel bind(SocketAddress local, int backlog)
throws IOException;
public abstract <T> ServerSocketChannel setOption(SocketOption<T> name, T value)
throws IOException;
public abstract ServerSocket socket();
public abstract SocketChannel accept() throws IOException;
@Override
public abstract SocketAddress getLocalAddress() throws IOException;
}
Selector
Selector
Selector接口中声明了几个需要实现的函数,其中比较重要的就是:
- open() 获取选择器。
- select() 获取就绪通道数量。
- selectedKeys() 获取就绪通道的集合。
public abstract class Selector implements Closeable {
// 默认构造函数
protected Selector() { }
public static Selector open() throws IOException {
// 单例模式下通过provider获取选择器
return SelectorProvider.provider().openSelector();
}
public abstract boolean isOpen();
public abstract SelectorProvider provider();
public abstract Set<SelectionKey> keys();
public abstract Set<SelectionKey> selectedKeys();
public abstract int selectNow() throws IOException;
public abstract int select(long timeout)
throws IOException;
public abstract int select() throws IOException;
public abstract Selector wakeup();
public abstract void close() throws IOException;
}
SelectorProvider
JDK就是通过SelectorProvider来获取选择器和通道的。
public abstract class SelectorProvider {
private static final Object lock = new Object();
private static SelectorProvider provider = null;
protected SelectorProvider() {
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkPermission(new RuntimePermission("selectorProvider"));
}
private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
} catch (ClassNotFoundException x) {
throw new ServiceConfigurationError(null, x);
} catch (IllegalAccessException x) {
throw new ServiceConfigurationError(null, x);
} catch (InstantiationException x) {
throw new ServiceConfigurationError(null, x);
} catch (SecurityException x) {
throw new ServiceConfigurationError(null, x);
}
}
private static boolean loadProviderAsService() {
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}
// 单例模式实现provider获取
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
// 如果之前构造过provider,那就直接返回
return provider;
// 构造一个新的provider
// doPrivileged属于特权操作,意思是不管此方法由哪个用户发起,都无需对此操作涉及的资源(文件读写特权等等)进行检查
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
//由JDK的参数-Djava.nio.channels.spi.SelectorProvider=class设置的class来反射构造SelectorProvider
if (loadProviderFromProperty())
return provider;
//从jar中的目录META-INF/services配置文件中找参数java.nio.channels.spi.SelectorProvider=class设置的第一个class来反射构造SelectorProvider
if (loadProviderAsService())
return provider;
//调用不同操作系统版本的JDK里自带的sun.nio.ch.DefaultSelectorProvider来创建SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
public abstract DatagramChannel openDatagramChannel()
throws IOException;
public abstract DatagramChannel openDatagramChannel(ProtocolFamily family)
throws IOException;
public abstract Pipe openPipe()
throws IOException;
public abstract AbstractSelector openSelector()
throws IOException;
public abstract ServerSocketChannel openServerSocketChannel()
throws IOException;
public abstract SocketChannel openSocketChannel()
throws IOException;
public Channel inheritedChannel() throws IOException {
return null;
}
}