当下服务器端的网络服务为实现高并发,高吞吐量使用的最为流行的模式是事件驱动(event driven)模式,Reactor模式是事件驱动模式的一种。
说起I/O的Reactor模式,最经典的文章是Doug Lea的《Scalable IO in Java》,里面介绍了Reactor模式的三种线程模型:单线程,多线程和主从Reactor模型。文章短短几十页,但是说明了高并发IO的线程模型的基础。但是这篇文章比较侧重于基础,也过于简洁,要想在其基础上实现高并发需要有额外的处理。本文先解释论文中的的多线程模型,然后探讨如何改进,进而实现更贴近实际应用的高性能线程模型。
读懂本文需要对Selector有基本的了解,如果不了解,可以读一下我的另一篇文章:《以最简单易懂的方式介绍I/O模型》。
1. Reactor多线程模型
1.1 大部分网络服务包括以下处理步骤
read request(读取客户端发送过来的byte数据)
decode request(把byte数据解码成特定类型的数据)
process (compute) service(根据请求数据进行业务处理)
encode reply(把处理结果转换成byte数据)
send reply(发送byte数据给客户端)
1.2 Reactor多线程模型图(来自于Doug Lea的文章)
Reactor - 负责响应IO事件,把事件分发给相应的处理代码。Reactor运行在一个独立的线程中(非Thread Pool中的线程)。具体来说,Reactor主要有两个职责,一个是处理来自客户端的连接事件,处理代码由acceptor实现;另一个是处理读取和发送数据的事件,处理代码由Handler实现。
Acceptor - 用以接受客户端的连接请求,然后创建Handler对连接进行后续的处理(读取,处理,发送数据)。
Handler - 事件处理类,用以实现具体的业务逻辑。图中read,decode,compute,encode和send都是由handler实现的。
Thread Pool - Thread Pool中的thread被称作worker thread。Handler中的decode,compute和encode是用worker thread执行的。值得注意的是Handler中的read和send方法是在Reactor线程而不是worker thread中执行的。这意味着对socket数据的读取发送数据和对数据的处理是在不同的线程中进行的.
1.3 Reactor多线程模型的主要问题
- read和send会影响接受客户端连接的性能 前面分析过read和send是在Reactor线程中执行的,接受客户端的连接请求也是在Reactor线程中执行。这使得如果有read或者send耗时较长,会影响其他客户端连接的速度。
- Read和send性能不够高效 网络服务对于来自同一客户端的read和send是串行的,但是对于不同客户端之间的read和send是可以并行进行的。由于read和send运行在Reactor单线程中,不能充分发挥硬件能力。
- 线程上下文切换带来额外开销 前面提到的处理客户端请求的步骤依次是read,decode,process,encode,send。由于read和send是在Reactor线程中执行,而decode,process和encode是在worker thread线程中执行,引入了额外的线程切换开销,这种开销在高并发的时候会体现出来。
2. 实际应用中的多线程模型
Doug Lea文章中的”主从Reactor“模式可以解决上述第一个和第二个问题。它把接受客户端连接的
“主Reactor”单独运行在一个线程中,“从Reactor”有多个,组成一个Reactor Pool,每个”从Reactor“都运行在一个独立的线程上,具有自己的selector和dispatch loop。但第三个问题还是没有解决,read和send仍然是运行在”从Reactor“线程上,而decode,process和encode运行在worker thread上。
要解决第三个问题,可以采用下面的方式。Reactor线程专门用于接受客户端连接(通过acceptor);创建多个Event Loop ,组成Event Loop Pool,每个Event Loop都有自己的Selector,并且运行在独立的线程上;Acceptor对于每一个客户端的连接从EventLoopPool中选择一个Event Loop进行处理,并且保证每个客户端连接在整个生命周期中都是由同一个Event Loop线程来处理,从而使得Handler中的实现-read,decode,process,encode,send-都在同一个线程中执行。整个线程模型除了高效的性能,还有非常重要的一点是Handler的实现不需要加锁,一方面对性能有帮助,另一方面避免多线程编程的复杂度。
2.1 改进后的模型图
3. 代码示例及注解
class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; static final int EVENT_LOOP_POOL_SIZE = 4; final EventLoop[] eventLoopPool = new EventLoop[EVENT_LOOP_POOL_SIZE]; int currentIndex = 0; Reactor(int port) throws IOException { // 创建并设置ServerSocketChannel对象 serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); // 创建Selector对象,为ServerSocketChannel对象注册OP_ACCEPT事件 selector = Selector.open(); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 在SelectionKey对象中保存Acceptor对象, // 这样Acceptor对象可以传递给OP_ACCEPT事件发生时的处理方法 sk.attach(new Acceptor()); // 创建EventLoop对象,每个EventLoop对象会启动一个线程 initEventLoopPool(); } void initEventLoopPool() { try { for (int i = 0; i < EVENT_LOOP_POOL_SIZE; i++) { eventLoopPool[i] = new EventLoop(); } } catch (IOException ex) { /* ... */ } } public void run() { // 运行在独立的线程 try { while (!Thread.interrupted()) { selector.select(); // 当没有客户端连接时,此语句会阻塞 // 执行到以下语句意味着有客户端连接了 Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { // 取出保存在SelectionKey中的Acceptor对象,处理连接请求 SelectionKey sk = ((SelectionKey) (it.next())); Acceptor acceptor = (Acceptor)sk.attachment(); acceptor.accept(); } selected.clear(); } } catch (IOException ex) { /* ... */ } } class Acceptor { public void accept() { try { // 每个SocketChannel对象都代表一个和客户端的TCP连接, // 读取客户端发过来的数据和发送数据给客户端都是通过SocketChannel对象进行 SocketChannel c = serverSocket.accept(); if (c != null) // SocketChannel对象的数据读写和处理逻辑是由Handler实现的, // Handler对象的方法是在选中的某一个EventLoop的线程中执行的(为什么?答案在Handler实现中) new Handler(nextEventLoop().selector, c); } catch(IOException ex) { /* ... */ } } EventLoop nextEventLoop() { // 循环选择EventLoop(Round Robin方式),使得每个EventLoop负载均衡 return eventLoopPool[(currentIndex++) % EVENT_LOOP_POOL_SIZE]; } } }
final class EventLoop implements Runnable { final Selector selector; EventLoop() throws IOException { // 每个EventLoop都有属于自己的Selector对象, // 并且运行在独立的线程上 selector = Selector.open(); new Thread(this).start(); } public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) (it.next())); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) r.run(); } }
final class Handler implements Runnable { static final int MAXIN = 5000; static final int MAXOUT = 10000; final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // 为什么此Handler会在EventLoop中执行? // 因为入参Selector对象来自于EventLoop, // EventLoop的线程负责调用此Selector对象的select方法等待事件的发生, // 事件发生后调用SelectionKey.attachment上的run方法, // 由于attachment就是此Handler对象,所以相当于在EventLoop线程中调用此Handler的run方法。 sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ } public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); // inputIsComplete方法是干什么用的? // 客户端发送过来的数据并不一定是全部放到buffer后再通知selector说数据就绪了, // 大部分情况是部分数据放到缓存以后就通知selector说数据就绪了。所以此read方法可能会被调用多次。 // 用户需要在客户端和服务端约定好怎么样才算数据发送完了, // 比如可以用一个特殊的字符结尾,或者先用发送过来的头两个字节代表数据的长度等。 if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } }
第一时间读文章,搜索作者公众号“码年”,或者扫码关注: