zoukankan      html  css  js  c++  java
  • python并发编程之IO模型

    一 IO模型介绍

      为了更好地了解IO模型,我们需要事先回顾一下:同步,异步,阻塞,非阻塞

        同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,我先限定一下本文的上下文。

        本文讨论的背景是Linux环境下的network IO。本文最重要的参考文献是Richard Stevens的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各种IO的特点和区别,如果英文够好的话,推荐直接阅读。Stevens的文风是有名的深入浅出,所以不用担心看不懂。本文中的流程图也是截取自参考文献。

    stevens在文章一共比较了五种IO model:

    * blocking IO
        * nonblocking IO
        * IO multiplexing
        * signal driven IO
        * asynchronous IO
        由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。

        再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

    #等待数据准备(waiting for the data to be ready)
    #将数据从内核拷贝到进程中(copying the data from the kernel to the process)

    记住这两点很重要,因为这些IO模型的区别就是在这两个阶段上各有各的不同情况。

    补充:

    1、输入操作:read 、readv、recv、recvfrom、recvmsg共五个函数,如果会阻塞状态,则会经过wait data 和copy data两个阶段,如果设置为非阻塞则在wait,不到data时抛出异常
    
    2、操作输出:write、writev、send、sendto、sendmsg共五个函数,在发送缓冲区满了灰阻塞在原地,如果设置为非阻塞,则会抛出异常
    3、接受外来连接:accept,与输入操作类似
    
    4、发出外来链接:connect,与输出操作类似

    二 阻塞IO(blocking IO)

    在linux,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样的

      

     当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于netword io来说,很多时候数据在一开始还说没有到达(比如,还没有收到一个完整UDP包),这个时候kernel就要等待足够的数据到来。

    而在用户进程这边,整个进程会被阻塞,当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存

    ,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

     所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    很多程序员第一次接触到网络编程都是从listen()send() ecv()等借口开始的,使用这些借口可以很方便的构建服务器/客户机的模型。然而大部分的socket借口都是阻塞型的。

    PS:所谓阻塞型借口是指的是系统调用(一般是IO借口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

    一个简单的解决方案:

    #在服务器端使用多线程(或者多进程)。多线程(或多进程)的目的是让每个连接都拥有独立线程(或进程),这样任何一个连接额阻塞都不会影响其他的连接

    该方案的问题是:

    #开启多线程或多进程的方式,在遇到要同时响应成百上千的连接请求,则无论多线程还是多进程都会严重占用系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态

    改进方案:

    #很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

    改进后方案其实也存在着问题

    #“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

    对应上列中的所有面临的可能同时出现的上千甚至上万次的客户端请求,‘线程池’或者‘连接池’或许可以缓解部分压力,但是不能解决所有的问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

    三 非阻塞IO

    Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子: 

    从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

        也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

    所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

    服务器

    import
    time s=socket() s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) r_list=[] w_list=[] while True: try: conn,addr=s.accept() r_list.append(conn) except BlockingIOError: print('可以去干其他的活了') print('rlist:',Len(r_list)) del_rlist=[] for item in w _list: try: data=conn.recv(1024) conn.send(data.upper()) except BlockingIOError: continue del_wlist=[] for item in w_list: try: conn=item[0] res=item[1] conn.send(res) del_wlist.append(item) except BlockingIOError: continue except ConnectionAbortedError: conn.close() del_wlist.append(item) #回收无用的连接 for conn in del_rlist: r_list.remove(conn) for item in del_wlist: w_list.remove(item)
    #客户端
    import socket
    import os
    client=socket。socket()
    client.connect(('127.0.0.1',8080))
    while True:
      res=('%s hello'%os.getpid()).encode('utf-8')
      client.send(res)
      data=client.recv(1024)
      print(data.decode('utf-8'))

    四 多路复用IO

    IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图: 

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
        这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

    强调:

    1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

        2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

    结论:

    select的优势在于可以处理多个连接,不适用单个连接

    #select网络IO模型
    #服务端
    
    #服务端
    
    from socket import *
    import select
    server=socket(AF_INET,SOCK_STREAM)
    server.bind(('127.0.0.1',8093))
    server.listen(5)
    server.setblocking(False)
    print('starting.....')
    rlist=[server,]
    wlist=[]
    wdata=[]
    while True:
        rl,wl,xl=select.select(rlist,wlist,[],0.5)
        print(wl)
        for sock in rl:
            if sock ==server:
                conn,addr=sock.accept()
                rlist.append(conn)
            else:
                try:
                    data=sock.recv(1024)
                    if not data:
                        sock.close()
                        rlist.remove(sock)
                        continue
                    wlist.append(sock)
                    wdata[sock]=data.upper()
                except Exception:
                    sock.close()
                    rlist.remove(sock)
        for sock in wl:
            sock.send(wdata[sock])
            wlist.remove(sock)
            wdata.pop(sock)
    #客户端
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8093))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()

    select监听fd变化的过程分析:

    #用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
    #用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。

    该模型的优点:

    #相比其他模型,使用select()的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多CPU,同时能够为多客户端提供服务。如果视图建立一个简单的时间驱动的服务器程序,这回模型已定有参考价值。

    该模型的缺点:

    #首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
    #其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

    五 异步IO

    linux下的异步IO其实用的不多,从内核2.6版本才开始引入。先看一下它的流程

    用户进程发起read操作之后,立刻就开始去做其他的事。另一个方面,从kernel的角度,当kernel的角度,当它受到一个异步read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

  • 相关阅读:
    二维数组最大关联子数组
    四则运算(终极版)
    最大子数组
    四则运算(三) 记录日志
    四则运算(三)
    四则运算记录日志
    四则运算(二)
    简单web四则运算出题
    Daily Scrum
    Daily Scrum
  • 原文地址:https://www.cnblogs.com/wuchenyu/p/8982654.html
Copyright © 2011-2022 走看看