zoukankan      html  css  js  c++  java
  • tornado 采用 epoll 代理构建高并发网络模型

    1 阻塞和非阻塞

     对于阻塞和非阻塞,网上有一个很形象的比喻,就是说好比你在等快递,阻塞模式就是快递如果不到,你就不能做其他事情。非阻塞模式就是在这段时间里面,你可以做其他事情,比如上网、打游戏、睡觉等,很显然非阻塞的模式会效率更高。
     非阻塞的模式也分两种,第一种就是忙轮询,因为你不知道快递什么时候来,所以你每5分钟就跟快递打一次电话进行询问,另外一种就是我们这篇文章讲的 epoll 模型,在等待快递到达的时间内,你尽可以做其他任何事情,包括睡觉,当快递到达时,你就会被告知。

     那么阻塞在操作系统中到底是如何进行的呢?假设有一个管道,进程A为管道的写入方,B为管道的读出方。


    管道示意图


     假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。
     但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。
     假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
     也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。

     这四个情形涵盖了四个I/O事件,缓冲区满,缓冲区空,缓冲区非空,缓冲区非满,这四个I/O事件是进行阻塞同步的根本。那么在我们的 client-server 模型是怎样发生阻塞的呢?
     socket 之间的通信就像这个管道,两端的 socket 会进入读取和写入。但请注意的是,写入仅仅表示数据被复制到了内核中的 TCP 发送缓冲区,至于什么时候发送到网络,什么时候被对方主机接收,什么时候被对方进程读取,系统调用层面不会给与任何通知。由于缓冲区的大小是有限的,当该socket的写入缓冲区满时会发生阻塞。所以,如果接收端进程从socket读数据的速度跟不上发送端进程向socket写数据的速度,会导致发送端write调用阻塞。
     读取阻塞则相对来说非常容易理解,就是该 socket 的读取缓冲区中没有数据时发生阻塞,通常是因为发送端的数据没有到达。如果想对缓冲区有一个感性的了解,可以在 Linux 下执行如下命令,查看本机 socket 的发送和读取缓冲区大小。如下图所示:


    查看socket缓冲区

     既然 socket 在读写的过程中会存在阻塞,那么如何进行非阻塞的socket 读写呢?很简单,我们可以记录所有这些流,通过写一个 for 循环把所有socket流从头到尾问一遍。但这样的做法显然不好,因为某些 socket 没有数据,则只会浪费 CPU 的时间。那怎么解决这个问题呢?答案就是引进一个 代理,通过代理来观察许多流的I/O 事件,在空闲的时候把当前线程阻塞掉,当一个或多个流有 I/O 事件时,就从阻塞态醒来,这个代理就是 select, poll 和 epoll 模型。

    2 select, poll, epoll 代理

    select 和 poll

      select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

    while true {
        select(streams[])
        for i in streams[] {
            if i has data
            read until unavailable
        }
    }
    select的优点是支持目前几乎所有的平台,缺点主要有如下2个:
     1)单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
     2)select 所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
     poll则在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

    epoll

     epoll是Linux 2.6 开始出现的为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
     在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

    伪代码如下:

    while true {
        active_stream[] = epoll_wait(epollfd)
        for i in active_stream[] {
            read or write till
        }
    }

    3 采用 select 和 epoll 代理重构网络并发模型

     通过上面的分析,我们知道采用代理可以一次处理多个连接,那么到底是如何实现的呢,我们以上节课的代码为基础,分别使用 select 和 epoll 代理进行重构,并比较它们之间的区别。先给出代码如下:
    1) select 代理实现的 server代码:server_select.py

    #coding:utf-8
    import socket
    from time import ctime
    import select
    import Queue
    
    HOST = ''
    PORT = 21567
    BUFSIZE = 1024
    ADDR = ('127.0.0.1', PORT)
    
    # 服务器端创建 socket
    serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serverSock.bind(ADDR)
    serverSock.listen(5)
    
    inputs = [serverSock]
    
    outputs = []
    
    timeout = 20
    
    message_queues = {}
    
    while inputs:
        print "doing select ..."
    
        readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)
    
        for s in readable:
    
            if s is serverSock:
                server2client_Sock, addr = serverSock.accept()
                print " Connection from "
                server2client_Sock.setblocking(0)
                inputs.append(server2client_Sock)
    
                message_queues[server2client_Sock] = Queue.Queue()
    
            else:
                server2client_Sock = s
    
                data = server2client_Sock.recv(BUFSIZE)
    
                # 如果数据接收完,则退出 recv, 进入到下一个连接
                if data:
                    # server2client_Sock.send('[%s] %s' % (ctime(), data))
                    print "Received data from ", server2client_Sock.getpeername()
                    data = '[%s] %s' % (ctime(), data)
    
                    message_queues[server2client_Sock].put(data)
    
    
                    # 将建立连接的 socket 放入到可以写的 socket 列表中
                    if server2client_Sock not in outputs:
                        outputs.append(server2client_Sock)
                else:
                    if server2client_Sock in outputs:
                        outputs.remove(server2client_Sock)
    
                    inputs.remove(server2client_Sock)
    
                    server2client_Sock.close()
    
                    del message_queues[server2client_Sock]
    
        if s in writable:
            try:
                next_msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print " " , s.getpeername() , 'queue empty'
                outputs.remove(s)
            else:
                print " sending " , next_msg , " to ", s.getpeername()
                s.send(next_msg)
    
    serverSock.close()

    2) epoll代理实现的 server代码:server_epoll.py

    #coding:utf-8
    import socket
    from time import ctime
    import select
    import Queue
    
    HOST = ''
    PORT = 21567
    BUFSIZE = 1024
    ADDR = ('127.0.0.1', PORT)
    
    # 服务器端创建 socket
    serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serverSock.bind(ADDR)
    serverSock.listen(5)
    
    timeout = 1000 # millisecond
    
    message_queues = {}
    
    # key state of socket io
    READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
    READ_WRITE = (READ_ONLY|select.POLLOUT)
    
    
    poller = select.poll()
    poller.register(serverSock, READ_ONLY)
    
    fd_to_sockets = {serverSock.fileno(): serverSock, }
    
    while True:
        print "Waiting for next event ..."
        events = poller.poll(timeout)
    
        for fd, flag in events:
            s = fd_to_sockets[fd]
            if flag & (select.POLLIN | select.POLLPRI):
                if s is serverSock:
                    server2client_Sock, addr = serverSock.accept()
                    print "Connetion from ", addr
                    server2client_Sock.setblocking(0)
    
                    fd_to_sockets[server2client_Sock.fileno()] = server2client_Sock
                    poller.register(server2client_Sock, READ_ONLY)
    
                    message_queues[server2client_Sock] = Queue.Queue()
    
                else:
                    server2client_Sock = s
    
                    data = server2client_Sock.recv(BUFSIZE)
    
                    if data:
                        print "Received data from ", server2client_Sock.getpeername()
                        data = '[%s] %s' % (ctime(), data)
    
                        message_queues[server2client_Sock].put(data)
    
    
                        # 将建立连接的 socket 放入到可以写的 socket 列表中
                        poller.modify(server2client_Sock, READ_WRITE)
                    else:
                        poller.unregister(server2client_Sock)
                        server2client_Sock.close()
                        del message_queues[server2client_Sock]
            else:
                try:
                    next_msg = message_queues[s].get_nowait()
                except Queue.Empty:
                    print " " , s.getpeername() , 'queue empty'
                    poller.modify(s, READ_ONLY)
                else:
                    print " sending " , next_msg , " to ", s.getpeername()
                    s.send(next_msg)
    
    serverSock.close()

    对于 select 代理最核心的调用就是

    readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)

    该调用将可读可写的socket存储到 readable 和 writable 列表中,从而我们可以直接调用这些 socket的 recv 和 send 时不会发生阻塞。注意除了 serverSock 只读以外,其他 socket 都会存在同时存在于 inputs 和 outputs 列表中。

    对于 epoll 代理最核心的就是

    events = poller.poll(timeout)

    该调用不需要输入观察的 socket,它是之前通过 register 来指定的。和 select 模式中的代码一样,这些 socket 都是可读可写的,通过如下代码实现:

    poller.modify(server2client_Sock, READ_WRITE)

    4 问题分析

      至此使用 epoll 代理重构我们的 server已经做完了,大家可以同时运行多个 client.py 进行交互,我们可以观察到该 server 具备了同时处理多个客户端连接的能力,而且所有的 socket 都是非阻塞的。
      然而,对于这样一个简单的服务(将客户端发送的数据加上时间戳返回)我们的代码结构看起来已经非常复杂,所以如果要处理我们实际业务中的逻辑处理简直是无法想象。
     另外,对于超高的并发请求,仅仅采用 epoll 模型是不够的,我们还必须使用多进程多线程等方式来充分利用系统资源。
     关于后面这个问题会是本系列文章重点讨论的部分,也是tornado 源码中的核心部分,我们会在稍微后面文章中去讨论。接下来的几篇文章中我们会尝试先梳理代码结构的问题,真正进入到 tornado 源码的部分,看看 tornado 的如何进行架构的。与其他torando源码文章不同的是,我们不会直接按模块挨个挨个分析代码,这种方式既晦涩难懂又非常低效,而是尝试从零开始搭建它,也就是说我们会仿造 tornado 的框架结构,从最简单开始,一步一步去实现 tornado,从而可以从内部逐步理清 tornado 这个优秀高并发框架的脉络。

  • 相关阅读:
    day03 字符串
    day02 运算符和编码
    day01 初识Python
    windows 安装yaml支持和pytest支持等
    Python自动补全缩写意义
    关于python接口测试connect error
    关于Python的post请求报504错误
    python函数参数*args **kwargs
    利用Python语言Appium启动ios app
    shell 中| 用法
  • 原文地址:https://www.cnblogs.com/Erick-L/p/7864827.html
Copyright © 2011-2022 走看看