zoukankan      html  css  js  c++  java
  • 满足高并发的I/O Reactor线程模型 (附图,附代码)

    当下服务器端的网络服务为实现高并发,高吞吐量使用的最为流行的模式是事件驱动(event driven)模式,Reactor模式是事件驱动模式的一种。

    说起I/O的Reactor模式,最经典的文章是Doug Lea的《Scalable IO in Java》,里面介绍了Reactor模式的三种线程模型:单线程,多线程和主从Reactor模型。文章短短几十页,但是说明了高并发IO的线程模型的基础。但是这篇文章比较侧重于基础,也过于简洁,要想在其基础上实现高并发需要有额外的处理。本文先解释论文中的的多线程模型,然后探讨如何改进,进而实现更贴近实际应用的高性能线程模型。

    读懂本文需要对Selector有基本的了解,如果不了解,可以读一下我的另一篇文章:《以最简单易懂的方式介绍I/O模型》。

    1. Reactor多线程模型

    1.1 大部分网络服务包括以下处理步骤
    read request(读取客户端发送过来的byte数据)
    decode request(把byte数据解码成特定类型的数据)
    process (compute) service(根据请求数据进行业务处理)
    encode reply(把处理结果转换成byte数据)
    send reply(发送byte数据给客户端)


    1.2 Reactor多线程模型图(来自于Doug Lea的文章)

    Reactor - 负责响应IO事件,把事件分发给相应的处理代码。Reactor运行在一个独立的线程中(非Thread Pool中的线程)。具体来说,Reactor主要有两个职责,一个是处理来自客户端的连接事件,处理代码由acceptor实现;另一个是处理读取和发送数据的事件,处理代码由Handler实现。
    Acceptor - 用以接受客户端的连接请求,然后创建Handler对连接进行后续的处理(读取,处理,发送数据)。
    Handler - 事件处理类,用以实现具体的业务逻辑。图中read,decode,compute,encode和send都是由handler实现的。
    Thread Pool - Thread Pool中的thread被称作worker thread。Handler中的decode,compute和encode是用worker thread执行的。值得注意的是Handler中的read和send方法是在Reactor线程而不是worker thread中执行的。这意味着对socket数据的读取发送数据和对数据的处理是在不同的线程中进行的.

    1.3 Reactor多线程模型的主要问题

    1. read和send会影响接受客户端连接的性能
前面分析过read和send是在Reactor线程中执行的,接受客户端的连接请求也是在Reactor线程中执行。这使得如果有read或者send耗时较长,会影响其他客户端连接的速度。
    2. Read和send性能不够高效
网络服务对于来自同一客户端的read和send是串行的,但是对于不同客户端之间的read和send是可以并行进行的。由于read和send运行在Reactor单线程中,不能充分发挥硬件能力。
    3. 线程上下文切换带来额外开销
前面提到的处理客户端请求的步骤依次是read,decode,process,encode,send。由于read和send是在Reactor线程中执行,而decode,process和encode是在worker thread线程中执行,引入了额外的线程切换开销,这种开销在高并发的时候会体现出来。

    2. 实际应用中的多线程模型
    Doug Lea文章中的”主从Reactor“模式可以解决上述第一个和第二个问题。它把接受客户端连接的
“主Reactor”单独运行在一个线程中,“从Reactor”有多个,组成一个Reactor Pool,每个”从Reactor“都运行在一个独立的线程上,具有自己的selector和dispatch loop。但第三个问题还是没有解决,read和send仍然是运行在”从Reactor“线程上,而decode,process和encode运行在worker thread上。

    要解决第三个问题,可以采用下面的方式。Reactor线程专门用于接受客户端连接(通过acceptor);创建多个Event Loop ,组成Event Loop Pool,每个Event Loop都有自己的Selector,并且运行在独立的线程上;Acceptor对于每一个客户端的连接从EventLoopPool中选择一个Event Loop进行处理,并且保证每个客户端连接在整个生命周期中都是由同一个Event Loop线程来处理,从而使得Handler中的实现-read,decode,process,encode,send-都在同一个线程中执行。整个线程模型除了高效的性能,还有非常重要的一点是Handler的实现不需要加锁,一方面对性能有帮助,另一方面避免多线程编程的复杂度。

    2.1 改进后的模型图

    3. 代码示例及注解

    class Reactor implements Runnable {
        final Selector selector;
        final ServerSocketChannel serverSocket;
        static final int EVENT_LOOP_POOL_SIZE = 4;
        final EventLoop[] eventLoopPool = new EventLoop[EVENT_LOOP_POOL_SIZE];
        int currentIndex = 0;
    
        Reactor(int port) throws IOException {
            // 创建并设置ServerSocketChannel对象
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(port));
            serverSocket.configureBlocking(false);
            // 创建Selector对象,为ServerSocketChannel对象注册OP_ACCEPT事件
            selector = Selector.open();
            SelectionKey sk =
                    serverSocket.register(selector,
                            SelectionKey.OP_ACCEPT);
            // 在SelectionKey对象中保存Acceptor对象,
            // 这样Acceptor对象可以传递给OP_ACCEPT事件发生时的处理方法
            sk.attach(new Acceptor());
            // 创建EventLoop对象,每个EventLoop对象会启动一个线程
            initEventLoopPool();
        }
    
        void initEventLoopPool() {
            try {
                for (int i = 0; i < EVENT_LOOP_POOL_SIZE; i++) {
                    eventLoopPool[i] = new EventLoop();
                }
            } catch (IOException ex) { /* ... */ }
        }
    
        public void run() { // 运行在独立的线程
            try {
                while (!Thread.interrupted()) {
                    selector.select(); // 当没有客户端连接时,此语句会阻塞
                    // 执行到以下语句意味着有客户端连接了
                    Set selected = selector.selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext()) {
                        // 取出保存在SelectionKey中的Acceptor对象,处理连接请求
                        SelectionKey sk = ((SelectionKey) (it.next()));
                        Acceptor acceptor = (Acceptor)sk.attachment();
                        acceptor.accept();
                    }
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }
    
        class Acceptor {
            public void accept() {
                try {
                    // 每个SocketChannel对象都代表一个和客户端的TCP连接,
                    // 读取客户端发过来的数据和发送数据给客户端都是通过SocketChannel对象进行
                    SocketChannel c = serverSocket.accept();
                    if (c != null)
                        // SocketChannel对象的数据读写和处理逻辑是由Handler实现的,
                        // Handler对象的方法是在选中的某一个EventLoop的线程中执行的(为什么?答案在Handler实现中)
                        new Handler(nextEventLoop().selector, c);
                } catch(IOException ex) { /* ... */ }
            }
    
            EventLoop nextEventLoop() {
                // 循环选择EventLoop(Round Robin方式),使得每个EventLoop负载均衡
                return eventLoopPool[(currentIndex++) % EVENT_LOOP_POOL_SIZE];
            }
        }
    }
    final class EventLoop implements Runnable {
        final Selector selector;
    
        EventLoop() throws IOException {
            // 每个EventLoop都有属于自己的Selector对象,
            // 并且运行在独立的线程上
            selector = Selector.open();
            new Thread(this).start();
        }
    
        public void run() { // normally in a new Thread
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set selected = selector.selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext())
                        dispatch((SelectionKey) (it.next()));
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }
    
        void dispatch(SelectionKey k) {
            Runnable r = (Runnable) (k.attachment());
            if (r != null)
                r.run();
        }
    }
    final class Handler implements Runnable {
        static final int MAXIN = 5000;
        static final int MAXOUT = 10000;
        final SocketChannel socket;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(MAXIN);
        ByteBuffer output = ByteBuffer.allocate(MAXOUT);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
        Handler(Selector sel, SocketChannel c)
                throws IOException {
            socket = c;
            c.configureBlocking(false);
            // 为什么此Handler会在EventLoop中执行?
            // 因为入参Selector对象来自于EventLoop,
            // EventLoop的线程负责调用此Selector对象的select方法等待事件的发生,
            // 事件发生后调用SelectionKey.attachment上的run方法,
            // 由于attachment就是此Handler对象,所以相当于在EventLoop线程中调用此Handler的run方法。
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }
        boolean inputIsComplete() { /* ... */ }
        boolean outputIsComplete() { /* ... */ }
        void process() { /* ... */ }
    
        public void run() {
            try {
                if (state == READING) read();
                else if (state == SENDING) send();
            } catch (IOException ex) { /* ... */ }
        }
    
        void read() throws IOException {
            socket.read(input);
            // inputIsComplete方法是干什么用的?
            // 客户端发送过来的数据并不一定是全部放到buffer后再通知selector说数据就绪了,
            // 大部分情况是部分数据放到缓存以后就通知selector说数据就绪了。所以此read方法可能会被调用多次。
            // 用户需要在客户端和服务端约定好怎么样才算数据发送完了,
            // 比如可以用一个特殊的字符结尾,或者先用发送过来的头两个字节代表数据的长度等。
            if (inputIsComplete()) {
                process();
                state = SENDING;
                // Normally also do first write now
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }
    
        void send() throws IOException {
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }

    第一时间读文章,搜索作者公众号“码年”,或者扫码关注:

  • 相关阅读:
    软件使用[17]
    软件使用[20]
    软件使用[12]
    软件使用[10]
    软件使用[22]
    软件使用[06]
    软件使用[11]SlickEdit
    软件使用[19]
    uva 10717【Mint】
    uva 10791【 Minimum Sum LCM】
  • 原文地址:https://www.cnblogs.com/yufengzhang/p/11511690.html
Copyright © 2011-2022 走看看