NIO 源码分析(05) Channel 源码分析
Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)
一、Channel 类图
功能说明:
InterruptibleChannel、AbstractInterruptibleChannel
提供了 Channel 响应 thread.interrupt(),支持中断操作,最重要的两个方法是 begin 和 end。SelectableChannel、AbstractSelectableChannel
提供了 Channel 注册到 Selector 的各种方法。ReadableByteChannel、ScatteringByteChannel
Channel 读数据。WritableByteChannel、GatheringByteChannel
Channel 写数据。NetworkChannel
Channel 进行端口绑定、参数设置等网络相关的操作。SocketChannel
Socket 门面,由 SelectorProvider.openSocketChannel 提供具体的实现类。ServerSocketChannel
ServerSocket 门面,由 SelectorProvider.openServerSocketChannel 提供具体的实现类。
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
在 SelectorProviderImpl 中默认的 ServerSocketChannel 实现类是 ServerSocketChannelImpl。
二、begin 和 close 是什么
begin() 和 end() 总是配对使用的,Channel 和 Selector 均有自己的实现,所完成的功能也是有所区别的。通常这两个方法的使用如下:
2.1 AbstractInterruptibleChannel 中的 begin 和 close
boolean completed = false;
try {
begin();
completed = ...; // Perform blocking I/O operation
return ...; // Return result
} finally {
end(completed);
}
AbstractInterruptibleChannel 中最重要的方法是 begin 和 end,它们的功能是什么呢?
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {
// Channel 中的 begin 就做了一件事,关闭 channel
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor); // t.blockedOn(b)
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end(boolean completed) throws AsynchronousCloseException {
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}
总结: Channel 中的 begin 就干了一件事,关闭 channel。我们先试想这样一个场景,Channel 要读写数据时所在线程被中断了,会发生什么事?线程既然被中断了,Channel 总要关闭吧。事实上 Channel 的 begin 和 end 就是做这个事情的。不过要理解 begin 和 end,似乎我们先得弄明白 AbstractInterruptibleChannel.blockedOn 究竟在干什么:
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
}
其中 JavaLangAccess 接口在 java.lang.System 中被实例化,它是这样写的:
private static void setJavaLangAccess() {
// Allow privileged classes outside of java.lang
sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
public void blockedOn(Thread t, Interruptible b) {
t.blockedOn(b);
}
}
}
现在我们发现,JavaLangAccess 的 blockedOn 实现,居然只有这么一句 t.blockedOn(b)。继续跟踪到 java.lang.Thread 中 blockedOn 的实现了:
private volatile Interruptible blocker;
void blockedOn(Interruptible b) {
synchronized (blockerLock) {
blocker = b;
}
}
实际上就是 Thread 线程上注册了一个钩子方法,当线程中断时,即调用 thread.interrupt() 时会回调这个 b.interrupt(this) 方法,进而关闭线程对应的 Channel。
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
2.2 Selector 中的 begin 和 end
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
// 线程中断时唤醒 selector
AbstractSelector.this.wakeup();
}};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end() {
AbstractInterruptibleChannel.blockedOn(null);
}
总结: Selector 中的 begin 和 end 就是在线程中断时唤醒对应的 selector,对应的使用如下:
protected int doSelect(long timeout) throws IOException {
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
// ...
}
三、Channel 注册
3.1 AbstractSelectableChannel 与 Channel 注册相关属性
private SelectionKey[] keys = null;
private int keyCount = 0;
// Lock for key set and count
private final Object keyLock = new Object();
// Blocking mode, protected by regLock
boolean blocking = true;
// Lock for registration and configureBlocking operations
private final Object regLock = new Object();
-
keys、keyCount
Channel 注册后的 SelectionKey 集合和数量,也就是说一个 Channel 可以注册到多个 Selector 上。keyLock 保证多线程下 keys、keyCount 操作时的数据安全。 -
blocking
Channel 是否是阻塞的。regLock 保证多线程下 blocking 操作的线程安全,同时注册时也需要加锁。
3.2 register 方法
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException {
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
// 注册的感兴趣事件不合法
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
// 1. 判断 Channel 是否是在这个 Selector 上是否已经注册
SelectionKey k = findKey(sel);
// 2. 如果已经注册,更新感兴趣事件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
// 3. 如果没有注册,委托 seletor.register 完成注册
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
总结: AbstractSelectableChannel 可以注册到多个 Selector 上,keys 属性管理了所有已经注册的 SelectionKey。对于已经注册的 SelectionKey 和未注册的处理逻辑并不同。具体步骤如下:
- 判断 Channel 是否是在这个 Selector 上是否已经注册。
- 如果 Channel 已经注册了,更新注册的感兴趣事件。
- 判断 Channel 未注册,则委托给 sel 完成注册。
3.3 SelectionKey.interestOps 事件注册
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
SelectionKeyImpl 是对 Channel 和 Seletor 的封装,具体的事件注册还是委托给了 channel 完成。
public SelectionKey interestOps(int ops) {
ensureValid();
return nioInterestOps(ops);
}
public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
总结: SelectionKey.interestOps 事件注册绕了一圈,最后发现又委托给了 Channel 来完成。
// ServerSocketChannel 事件注册
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
// Translate ops
if ((ops & SelectionKey.OP_ACCEPT) != 0)
newOps |= Net.POLLIN;
// Place ops into pollfd array
sk.selector.putEventOps(sk, newOps);
}
总结: 事件注册需要注意的是 OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT 只是 JDK 对底层 IO 事件的封装,在注册前需要将 JDK IO 事件转换成 Linux NIO 事件。最终事件的注册都是委托给了 Selector 完成,所以才说 Selector 才是 NIO 的核心。
// Net
public static final short POLLIN; // 读事件
public static final short POLLOUT; // 写事件
public static final short POLLERR;
public static final short POLLHUP;
public static final short POLLNVAL;
public static final short POLLCONN;
四、Channel.accept
public SocketChannel accept() throws IOException {
synchronized (lock) {
// 1. 连接校验
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
// 2. newfd、isaa 如果有新的 socket 连接,则会通过 accept 赋值
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
// 3. begin、end 配套使用则可以响应线程中断,关闭 channel
try {
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
// 4. 接收新的连接请求
n = accept(this.fd, newfd, isaa);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
// 4. socket 默认是阻塞的,所以如果是 NIO 编程需要手动设置成 false
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
sc = new SocketChannelImpl(provider(), newfd, isa);
// 5. 返回新的 socket 请求
return sc;
}
}
private int accept(FileDescriptor ssfd, FileDescriptor newfd,
InetSocketAddress[] isaa) throws IOException {
return accept0(ssfd, newfd, isaa);
}
总结: ServerSocketChannel 接收新的连接请求步骤和 BIO 类似,没有什么区别,大致有以下步骤:
- channel 连接校验
- begin、end 配套使用则可以响应线程中断,关闭 channel
- 调用 accept 接收新的连接请求
- socket 默认是阻塞的,所以如果是 NIO 编程需要手动设置成 false
五、Channel 关闭
close() 操作限于通道,而且还是实现了 InterruptibleChannel 接口的通道,例如 FileChannel 就没有 close 操作。
在分析 close() 具体实现之前,我们先得理解为什么要有 close() 这个操作:一个可选择的通道,在创建之初会生成一个 FileDescriptor,linux 下即为 fd,windows 下即为句柄,这些都是系统资源,不能无限占用,当在不使用的时候,就应该将其释放,close 即是完成这个工作的。
5.1 Channel.close
抽象类 AbstractInterruptibleChannel 实现了 InterruptibleChannel 接口,而 SelectableChannel 继承自 AbstractInterruptibleChannel,因此,可选择的通道同时也是可以 close 的。AbstractInterruptibleChannel 的 close 实现如下:
public final void close() throws IOException {
synchronized (closeLock) {
if (!open)
return;
open = false;
implCloseChannel();
}
}
来具体关闭逻辑就在 implCloseChannel() 中了,于是再看 AbstractSelectableChannel:
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
先看 synchronized 同步块,它将当前通道保存的 SelectionKey 全部 cancel,意思就是说,当前通关闭了,与它相关的所有 SelectionKey 都没有意义了,所以要全部取消掉,之前讲解 cancel 过程已经说明了,cancel 操作只是将 SelectionKey 加入对 应选择器的 cancelKeys 集合中,在下次正式选择开始的时候再一一清除;
这么看来,还是应该追究一下 implCloseSelectableChannel() 的实现了,下面分别从 ServerSocketChannel 和 SocketChannel 实现出发:
先看 ServerSocketChannelImpl
protected void implCloseSelectableChannel() throws IOException {
synchronized (stateLock) {
if (state != ST_KILLED)
nd.preClose(fd);
long th = thread;
if (th != 0)
NativeThread.signal(th);
if (!isRegistered())
kill();
}
}
出现了两个很奇怪的东西,看来要完全弄懂这段代码,是得好好分析一下它们了,它们是:NativeDispatcher nd 和 NativeThread;
如果已经对 linux 信号机制非常熟悉,应该很容易猜测到 NativeThread.signal(th) 在做什么,是的,它在唤醒阻塞的线程 th,下面我们来看看它是如何做到的:
5.2 NativeThread
NativeThread 类非常简单,几乎全是 native 方法:
class NativeThread {
static native long current();
static native void signal(long nt);
static native void init();
static {
Util.load();
init();
}
}
在看其本地实现:
//自定义中断信号,kill –l
#define INTERRUPT_SIGNAL (__SIGRTMAX - 2)
//自定义的信号处理函数,当前函数什么都不做
static void nullHandler(int sig) {
}
#endif
//NativeThread.init()的本地实现,可以看到它用到了sigaction
//sigaction用来install一个信号
JNIEXPORT void JNICALL
Java_sun_nio_ch_NativeThread_init(JNIEnv *env, jclass cl) {
#ifdef __linux__
sigset_t ss;
// 以下这段代码是常见的信号安装过程
// 讲解这段代码的目的只是为了让大家理解NativeThread.signal
// 的工作原理,故很多细节就简单带过了
struct sigaction sa, osa;
// sa用于定制信号INTERRUPT_SIGNAL的处理方式的
// 如sa_handler = nullHandler即用来指定信号处理函数的
// 即线程收到信号时,为执行这个函数,nullHandler是个空壳
// 函数,所以它什么都不做
// 不用理解sa_flags各个标识代表什么
// sigemptyset顾名思义,它是初始化sigaction的sa_mask位
// sigaction(INTERRUPT_SIGNAL, &sa, &osa)执行后
// 如果成功,则表示INTERRUPT_SIGNAL这个信号安装成功了
// 为什么要有这个init呢,其实不用这不操作也许不会有问题
// 但因为不能确保INTERRUPT_SIGNAL没有被其他线程install
// 过,如果sa_handler对应函数不是空操作,则在使用这个信号
// 时会对当前线程有影响
sa.sa_handler = nullHandler;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
if (sigaction(INTERRUPT_SIGNAL, &sa, &osa) < 0)
JNU_ThrowIOExceptionWithLastError(env, "sigaction");
#endif
}
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_NativeThread_current(JNIEnv *env, jclass cl) {
#ifdef __linux__
// pthread_self()即是获取当前线程ID,它与getpid()是不同的
// 具体细节没有研究
return (long)pthread_self();
#else
return -1;
#endif
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_NativeThread_signal(JNIEnv *env, jclass cl, jlong thread) {
#ifdef __linux__
//这个就是最关键的signal实现了,可以看到,它调用了pthread库的pthread_kill
//像thread线程发送一个INTERRUPT_SIGNAL信号,这个信号就是在init中install
//的,对应的处理函数是空函数,也就是说,往thread线程发送一个信号,如果该线程处于
//阻塞状态,则会因为受到信号而终止阻塞,而如果处于非阻塞,则无影响
if (pthread_kill((pthread_t)thread, INTERRUPT_SIGNAL))
JNU_ThrowIOExceptionWithLastError(env, "Thread signal failed");
#endif
}
Java 的 NativeThread 做静态初始化时已经执行了 init,也就是说 INTERRUPT_SIGNAL 信号已经被安装,而 ServerSocketChannelImpl 在上述 accept 时可能赋值。
try {
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
n = accept0(this.fd, newfd, isaa);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}
try 的内部,for 循环之前,thread 被复制为 NativeThread.current() 即为当前线程 id;finally 时 thread 又被修改回 0,因此在 implCloseSelectableChannel 才有这样一段:
if (th != 0)
NativeThread.signal(th);
NativeThread.signal(th) 通过像当前线程发送 INTERRUPT_SIGNAL 信号而确保 th 线程没有被阻塞,即如果阻塞就停止阻塞。
5.3 NativeDispatcher
现在理解了 NativeThread 了,我们再看 NativeDispatcher
首先我们得知道在 ServerSocketChannelImpl 中,nd 被初始化为 SocketDispatcher,见:
static {
Util.load();
initIDs();
nd = new SocketDispatcher();
}
又因为 linux 下一切皆文件的思想(现实虽然不绝对),SocketDispatcher 其实就是用 FileDispatcher 实现的,最终 FileDispatcher 也只是封装了一大堆 native 方法,一波三折,关于 FileDispatcher,这里先不详细讲解了,先针对 nd.preClose(fd) 和 kill 将 implCloseSelectableChannel 的过程说明白吧:
首先,我们要明白这样一个道理:在多线程环境下,总是很难知道什么时候可安全的关闭或释放资源(如fd),当一个线程 A 使用 fd 来读写,而另一个线程 B 关闭或释放了 fd,则 A 线程就会读写一个错误的文件或 socket;为了防止这种情况出现,于是 NIO 就采用了经典的 two-step 处理方案:
第一步:创建一个 socket pair,假设 FDs 为 sp[2],先 close 掉 sp[1],这样,该 socket pair 就成为了一个半关闭的链接;复制 (dup2)sp[0] 到 fd(即为我们想关闭或释放的fd),这个时候,其他线程如果正在读写立即会获得 EOF 或者 Pipe Error,read 或 write 方法里会检测这些状态做相应处理;
第二步:最后一个会使用到 fd 的线程负责释放
nd.preClose(fd) 即为两步曲中的第一步,我们先来看其实现,最终定位到 FileDispatcher.c,相关代码如下:
static int preCloseFD = -1;
JNIEXPORT void JNICALL
Java_sun_nio_ch_FileDispatcher_init(JNIEnv *env, jclass cl)
{
int sp[2];
if (socketpair(PF_UNIX, SOCK_STREAM, 0, sp) < 0) {
JNU_ThrowIOExceptionWithLastError(env, "socketpair failed");
return;
}
preCloseFD = sp[0];
close(sp[1]);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_FileDispatcher_preClose0(JNIEnv *env, jclass clazz, jobject fdo)
{
jint fd = fdval(env, fdo);
if (preCloseFD >= 0) {
if (dup2(preCloseFD, fd) < 0)
JNU_ThrowIOExceptionWithLastError(env, "dup2 failed");
}
}
从上面两个函数实现,我们可以看到,在 init 函数中,创建了一个半关闭的 socket pair,preCloseFD 即为未关闭的一端,init 在静态初始化时就会被执行;再来看关键的 preClose0,它的确是采用 dup2 来复制 preCloseFD,这样一来,fd 就被替换成了 preCloseFD,这正是 socket pair 中未被关闭的一端。
既然 nd.preClose(fd) 只是预关闭,则真正执行关闭的逻辑肯定在这个 kill 中了,从代码逻辑上还是比较好懂的,if (!isRegistered()) 即表示该通道没有被注册,表示所有 Selector 都没有意愿关心这个通道了,则自然可以放心的关闭 fd。
果断猜测 kill 中有 nd.close(fd) 这样的代码,不信请看:
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLED)
return;
if (state == ST_UNINITIALIZED) {
state = ST_KILLED;
return;
}
assert !isOpen() && !isRegistered();
nd.close(fd);
state = ST_KILLED;
}
}
果然如此,这样一来,关闭二步曲就能够较安全的释放我们的fd资源了,至于 nd.close(fd)的本地实现,这里就不讲了,肯定是采用了 close(fd) 的系统调用。总的来说,通道的 close 就是为了断开它与内核 fd 的那点联系。
每天用心记录一点点。内容也许不重要,但习惯很重要!