zoukankan      html  css  js  c++  java
  • 14、python开发之路-并发编程之I/O模型

    十四、并发编程之I/O模型

    http://www.cnblogs.com/linhaifeng/articles/7454717.html

    1、模型介绍

    1.1 IO种类

    (1)* blocking IO 阻塞IO

    (2)* nonblocking IO 非阻塞IO

    (3)* IO multiplexing 多路复用IO

    (4)* signal driven IO 信号驱动IO

    (5)* asynchronous IO 异步IO

    1.2 IO发生时的对象和步骤

    对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IOprocess (or thread),另一个就是系统内核(kernel)

    1.3 IO经历的阶段

    (1)等待数据准备

    (2)将数据从内核(操作系统内存)拷贝到进程(应用程序内存)中

    2、阻塞IO

    linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

     

    blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    3、非阻塞IO

    3.1 流程

    Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

     

    在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。  但是非阻塞IO模型绝不被推荐。

    3.2 实例

    #服务端
    from socket import *
    import time
    s=socket(AF_INET,SOCK_STREAM)
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    s.setblocking(False) #设置socket的接口为非阻塞
    conn_l=[]
    del_l=[]
    while True:
        try:
            conn,addr=s.accept()
            conn_l.append(conn)
        except BlockingIOError:
            print(conn_l)
            for conn in conn_l:
                try:
                    data=conn.recv(1024)
                    if not data:
                        del_l.append(conn)
                        continue
                    conn.send(data.upper())
                except BlockingIOError:
                    pass
                except ConnectionResetError:
                    del_l.append(conn)

            for conn in del_l:
                conn_l.remove(conn)
                conn.close()
            del_l=[]


    #客户端
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8080))

    while True:
        msg=input('>>: ')
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))

    3.3 优缺点

    优点:能够在等待任务完成的时间里干其他活了

    缺点:

        #1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况

        #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

    4、多路复用IO

    4.1 流程

    有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

     

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

        强调:

        1. 如果处理的连接数不是很高的话,使用select/epollweb server不一定比使用multi-threading + blocking IOweb server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

        2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IOblock

        结论: select的优势在于可以处理多个连接,不适用于单个连接

    4.2 实例

    #服务端
    from socket import *
    import select

    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(('127.0.0.1',8081))
    s.listen(5)
    s.setblocking(False) #设置socket的接口为非阻塞
    read_l=[s,]
    while True:
        r_l,w_l,x_l=select.select(read_l,[],[])
        print(r_l)
        for ready_obj in r_l:
            if ready_obj == s:
                conn,addr=ready_obj.accept() #此时的ready_obj等于s
                read_l.append(conn)
            else:
                try:
                    data=ready_obj.recv(1024) #此时的ready_obj等于conn
                    if not data:
                        ready_obj.close()
                        read_l.remove(ready_obj)
                        continue
                    ready_obj.send(data.upper())
                except ConnectionResetError:
                    ready_obj.close()
                    read_l.remove(ready_obj)

    #客户端
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8081))

    while True:
        msg=input('>>: ')
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))

    4.3 优缺点

    优点:

    #相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

    缺点:

    #首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epollBSD提供了kqueueSolaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。

    #其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

    5、异步IO

    Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。先看一下它的流程:

     

    用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

    6selectors模块

    select,poll,epoll这三种IO多路复用模型在不同的平台有着不同的支持,而epollwindows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的

    #服务端
    from socket import *
    import selectors

    sel=selectors.DefaultSelector()
    def accept(server_fileobj,mask):
        conn,addr=server_fileobj.accept()
        sel.register(conn,selectors.EVENT_READ,read)

    def read(conn,mask):
        try:
            data=conn.recv(1024)
            if not data:
                print('closing',conn)
                sel.unregister(conn)
                conn.close()
                return
            conn.send(data.upper()+b'_SB')
        except Exception:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()

    server_fileobj=socket(AF_INET,SOCK_STREAM)
    server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server_fileobj.bind(('127.0.0.1',8088))
    server_fileobj.listen(5)
    server_fileobj.setblocking(False) #设置socket的接口为非阻塞
    sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept

    while True:
        events=sel.select() #检测所有的fileobj,是否有完成wait data的
        for sel_obj,mask in events:
            callback=sel_obj.data #callback=accpet
            callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

    #客户端
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8088))

    while True:
        msg=input('>>: ')
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))

    7socketsever模块

    服务端:

    import socketserver

    class MyTCPhandler(socketserver.BaseRequestHandler):
        def handle(self):
            # print(self)
            # print(self.request) #self.request=conn
            while True:
                try:
                    print(self.client_address)
                    print(self.server) #obj
                    data=self.request.recv(1024)
                    if not data:break
                    self.request.send(data.upper())
                except ConnectionResetError:
                    break

    if __name__ == '__main__':
        print('starting...')
        obj=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyTCPhandler)
        #bind_and_activate=True
        #socketserver.ForkingTCPServer
        obj.serve_forever() #连接循环

        '''
        t=Thread(target=MyTCPhandler().handle)
        t.start()
        '''

        # obj.server_address #('127.0.0.1',8080)
        # obj.RequestHandlerClass #MyTCPhandler
        # obj.socket #s对象

    客户端:

    from socket import *

    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8080))

    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))

    8、并发编程总结(重点)

    1 生产者消费者模型

    2 开启进程、开启线程

    3 进程池、线程池(回调机制)

    4 GIL全局解释器锁

  • 相关阅读:
    链表详解自带代码
    队列
    单词翻转
    表达式求值
    一元多项式
    循环链表
    学生成绩管理系统
    双向循环链表
    双向链表
    静态链表
  • 原文地址:https://www.cnblogs.com/xiaojinyu/p/7729760.html
Copyright © 2011-2022 走看看