在学习 Netty 的 EventLoop 线程模型之前,需要先了解 Java 的 Reactor 模式。
一、Reactor 模型简介
在网络编程过程中,服务器端最原始的方式就是通过一个循环来不断的监听端口是否有新的 socket 链接,如果有就直接处理,比如读取 socket 输入,写入输出等等,完成之后再开始下一次监听。这种方式有一个最大的问题,当前请求没有处理完之前,下一个请求只能阻塞着,直到当前请求处理完成,服务器吞吐量低。在并发量大的情况下,效率很低。
自然的,我们会想到使用线程来处理。当有新的 socket 链接时,创建一个线程,将后面的处理逻辑都交给前程处理,即一个请求(socket 链接)用一个线程处理,这样就不会产生阻塞了。虽然这样解决了吞吐量的问题,但是又带来新的问题,即在并发量大的情况下,会不断的产生新的线程。反复的线程创建会消耗大量的资源,而多个线程的上下文切换也会影响效率。
采用事件驱动的方式可以很好的解决这些问题。当有事件发生,比如连接建立、数据可读或可写时,调用独立的处理器进行处理,避免阻塞主流程,同时也可以使用有限的线程来处理多个事件,避免了线程过多消耗大量资源。
Reactor 模型是一种事件驱动机制。一般的,应用程序通过主动调用某些 API 来完成处理,而 Reactor 却相反,应用程序提供相应的接口(回调函数)注册到 Reactor 上,如果有相应的事件发生,Reactor 将主动调用之前注册的接口。Reactor 可以同时接收多个请求,然后将它们分发到对应的处理器上。
Reactor 模型有以下几个主要部分:
- Selector:事件通知器
- Handler:事件处理器
- SelectionKey:事件标识
接下来使用之前的 demo,通过 Java 相关 API 实现 Server 端来学习 Reactor 模型的几种形态。
复用 demo 中 Netty 的 Client 端,连接建立时发送消息到 Server,接收 Server 返回的消息打印在控制台,以下是 Client 端的代码示例:
package com.niklai.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
public class Client {
private static final Logger logger = LoggerFactory.getLogger(Client.class.getSimpleName());
public static void init() {
try {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("localhost", 9999))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect().sync();点
future.channel().closeFuture().sync();
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
static class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String msg = "Client message!";
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
}
}
}
二、单线程的 Reactor 模型
所谓单线程的 Reactor,就是只有一个线程来通知和处理事件。
package com.niklai.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class ReactorServer {
private static final Logger logger = LoggerFactory.getLogger(ReactorServer.class.getSimpleName());
private Selector selector;
private boolean loop = true;
private ServerSocketChannel serverChannel;
public ReactorServer() {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public void init() {
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress("localhost", 9999));
SelectionKey selectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册连接接受事件
selectionKey.attach(new Accept(serverChannel, selector)); // 绑定连接接受事件的处理器
while (loop) {
int select = selector.select(); // 阻塞获取当前是否有事件触发
if (select != 0) {
Set<SelectionKey> readKeys = selector.selectedKeys(); // 获取触发的事件
Iterator<SelectionKey> iterator = readKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
Runnable runnable = (Runnable) key.attachment(); // 获取事件绑定的处理器并执行
runnable.run();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 连接建立时的事件处理器
static class Accept implements Runnable {
private ServerSocketChannel channel;
private Selector selector;
public Accept(ServerSocketChannel channel, Selector selector) {
this.channel = channel;
this.selector = selector;
}
@Override
public void run() {
try {
SocketChannel client = channel.accept();
new Handler(selector, client);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 读、写事件处理器
static class Handler implements Runnable {
private Selector selector;
private SocketChannel socket;
private SelectionKey selectionKey;
private HandleState state;
public Handler(Selector selector, SocketChannel socket) {
this.selector = selector;
this.socket = socket;
this.state = HandleState.READING;
try {
this.socket.configureBlocking(false);
this.selectionKey = this.socket.register(this.selector, 0);
this.selectionKey.interestOps(SelectionKey.OP_READ); // 注册读事件
this.selectionKey.attach(this); // 当前类就是读写处理器
this.selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
switch (state) {
case READING:
read();
break;
case WRITING:
write();
break;
}
}
private void read() {
StringBuffer sb = new StringBuffer();
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
while (true) {
buf.clear();
int read = socket.read(buf);
sb.append(Charset.forName("utf-8").newDecoder().decode(buf.asReadOnlyBuffer()).toString());
if (read == 0) {
logger.info("receive message: {}.....", sb.toString());
Thread.sleep(2000); // 模拟读取处理数据逻辑比较耗时
selectionKey.interestOps(SelectionKey.OP_WRITE); // 注册写事件
state = HandleState.WRITING;
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void write() {
try {
ByteBuffer output = ByteBuffer.wrap("Reactor server answer!".getBytes());
socket.write(output.duplicate());
} catch (IOException e) {
e.printStackTrace();
} finally {
selectionKey.cancel(); // 整个读写流程处理完,取消事件注册,避免再次触发。
}
}
private enum HandleState {
READING, WRITING
}
}
}
通过 Selector.open()方法创建一个 Selector 事件通知器,ServerSocketChannel 注册到 Selector 上监听 ACCEPT 事件,并绑定对应的 Accept 事件处理器。当连接建立后触发对应事件,创建 SocketChannel 并再次注册到同一个 Selector 上,并监听读写事件,同时绑定对应的 Handler 事件处理器。等待 READ 事件触发读取数据完成业务处理后触发写事件完成写数据。整个过程中,所有事件都由同一个 Selector 触发和处理。
运行单元测试,并行初始化 6 个 Client 连接到 Server 上,查看控制台
@Test
public void test() throws InterruptedException {
new Thread(() -> {
// 服务端
new ReactorServer().init();
}).start();
Thread.sleep(1000);
// 并行初始化多个Client,模拟并发效果
IntStream.range(1, 7).parallel().forEach(item -> {
logger.info("Client No.{} init...", item);
// 客户端
Client.init();
});
}
从控制台日志中可以看到,虽然 Client 是并行初始化连接到 Server 的,但是在 Server 端却是同一个线程依次处理的,每次处理耗时 2ms。这就是单线程 Reactor 的特点,资源利用率不高,在高并发的情况下效率会非常的低,甚至会因为某些处理逻辑耗时太长导致后面的连接被拒绝。
三、多线程的 Reactor 模型
为了解决上面的问题,我们可以考虑将耗时的操作放在线程里执行,这样可以避免 Selector 被阻塞。将 demo 中的 ReactorServer 改造一下
// 省略部分代码
static class Handler implements Runnable {
private Selector selector;
private SocketChannel socket;
private SelectionKey selectionKey;
private HandleState state;
private ExecutorService pool;
public Handler(Selector selector, SocketChannel socket) {
this.selector = selector;
this.socket = socket;
this.state = HandleState.READING;
this.pool = Executors.newFixedThreadPool(4); // 增加一个线程池
try {
this.socket.configureBlocking(false);
this.selectionKey = this.socket.register(this.selector, 0);
this.selectionKey.interestOps(SelectionKey.OP_READ);
this.selectionKey.attach(this);
this.selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
switch (state) {
case READING:
state = HandleState.WORKING; // 改变当前handler状态
pool.execute(() -> { // 将耗时操作放在线程池中执行
read();
});
break;
case WRITING:
write();
break;
}
}
private void read() {
StringBuffer sb = new StringBuffer();
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
while (true) {
buf.clear();
int read = socket.read(buf);
sb.append(Charset.forName("utf-8").newDecoder().decode(buf.asReadOnlyBuffer()).toString());
if (read == 0) {
logger.info("receive message: {}.....", sb.toString());
Thread.sleep(2000);
selectionKey.interestOps(SelectionKey.OP_WRITE);
state = HandleState.WRITING;
selector.wakeup(); // 唤醒Selector,让当前阻塞的selector.select()方法返回
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void write() {
try {
ByteBuffer output = ByteBuffer.wrap("Reactor server answer!".getBytes());
socket.write(output.duplicate());
} catch (IOException e) {
e.printStackTrace();
} finally {
selectionKey.cancel();
}
}
private enum HandleState {
WORKING,
READING,
WRITING
}
}
运行上面的单元测试,查看控制台
与之前运行的结果不同,耗时的 Read 方法都使用线程池的线程执行,整个流程相比之前总耗时要小很多。
四、主从 Reactor 多线程模型
从前面的例子可以看到,即使是使用了线程池,从头至尾都是一个 Selector 在负责事件分发和处理。当在分发之前的逻辑存在耗时长的情况时,会影响到其他事件的触发。这样我们可以将其分成两部分:监听并建立连接可以由一个独立的 Selector 负责,而读写和业务操作可以由另外一个 Selector 负责,并且前一个 Selector 将建立好的连接分派给后一个 Selector。继续修改 demo,实现上述过程。
package com.niklai.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ReactorServer {
private static final Logger logger = LoggerFactory.getLogger(ReactorServer.class.getSimpleName());
private Selector mainSelector; // 主Selector
private Selector[] subSelectors; // 从Selector
private int next = 0;
private int count = 2;
private boolean loop = true;
private ServerSocketChannel serverChannel;
public ReactorServer() {
try {
subSelectors = new Selector[count]; // 初始化多个从Selector,复用
for (int i = 0; i < count; i++) {
subSelectors[i] = Selector.open();
}
mainSelector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public void init() {
try {
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket socket = serverChannel.socket();
socket.bind(new InetSocketAddress("localhost", 9999));
SelectionKey selectionKey = serverChannel.register(mainSelector, SelectionKey.OP_ACCEPT);
selectionKey.attach((Runnable) () -> {
try {
SocketChannel client = serverChannel.accept();
new Handler(subSelectors[next], client); // 每建立一个连接,就选择一个从Selector绑定
next++;
if (next == count) {
next = 0;
}
} catch (Exception e) {
e.printStackTrace();
}
});
// 每个从Selector使用独立的线程监听事件,避免相互阻塞
for (int i = 0; i < count; i++) {
int finalI = i;
new Thread(() -> {
new HandlerLoop(subSelectors[finalI]).run();
}).start();
}
while (loop) {
int select = mainSelector.select();
if (select != 0) {
Set<SelectionKey> readKeys = mainSelector.selectedKeys();
Iterator<SelectionKey> iterator = readKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
static class HandlerLoop implements Runnable {
private Selector selector;
public HandlerLoop(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
int select = selector.select();
if (select != 0) {
Set<SelectionKey> readKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
Runnable runnable = (Runnable) key.attachment();
runnable.run();
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
static class Handler implements Runnable {
// 不变,省略代码
}
}
运行上面的单元测试,查看控制台
在这里只有两个从 Selector,却同时处理了六个 Socket 连接,可以达到使用较少的线程来处理大量的请求。
进一步的优化
在上面的例子里,每 accept 一个 SocketChannel,就会创建一个 handler,虽然多个 SocketChannel 共享了一个 Selector,但是每个 SocketChannel 对应的 handler 里都有一个独立的线程池。在高并发下,会创建大量的线程池,消耗大量的资源。为了解决这个问题,我们可以给每一个 Selector 分配一个线程池,用同一个线程池的线程来处理同一个 Selector 关联下的多个 SocketChannel。
public class ReactorServer {
// 省略代码
private ExecutorService[] pools; // 线程池数组
public ReactorServer() {
try {
subSelectors = new Selector[count];
pools = new ExecutorService[count]; // 初始化线程池组,数组大小与从Selector数组大小相同
for (int i = 0; i < count; i++) {
subSelectors[i] = Selector.open();
pools[i] = Executors.newFixedThreadPool(10); // 初始化线程池
}
mainSelector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public void init() {
try {
// 省略代码
selectionKey.attach((Runnable) () -> {
try {
SocketChannel client = serverChannel.accept();
new Handler(subSelectors[next], client, pools[next]); // 选择一个线程池给Handler
next++;
if (next == count) {
next = 0;
}
} catch (Exception e) {
e.printStackTrace();
}
});
// 省略代码
} catch (Exception e) {
e.printStackTrace();
}
}
static class Handler implements Runnable {
// 省略代码
public Handler(Selector selector, SocketChannel socket) {
this(selector, socket, Executors.newFixedThreadPool(4));
}
public Handler(Selector selector, SocketChannel socket, ExecutorService pool) {
this.selector = selector;
this.socket = socket;
this.state = HandleState.READING;
this.pool = pool;
try {
this.socket.configureBlocking(false);
this.selectionKey = this.socket.register(this.selector, 0);
this.selectionKey.interestOps(SelectionKey.OP_READ);
this.selectionKey.attach(this);
this.selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
// 省略代码
}
}