zoukankan      html  css  js  c++  java
  • Python进阶(5)_进程与线程之协程、I/O模型

    三、协程 

    3.1协程概念

    协程:又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

      协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

      协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程

    协程的好处:

    • 无需线程上下文切换的开销

    • 无需原子操作锁定及同步的开销方便切换控制流,简化编程模型

      "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

    协程的缺点:

    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。

    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    协程定义或标准(满足1,2,3就可称为协程):

    1. 必须在只有一个单线程里实现并发

    2. 修改共享数据不需加锁

    3. 用户程序里自己保存多个控制流的上下文栈

    4. 一个协程遇到IO操作自动切换到其它协程

        “上下文”,指的是程序在执行中的一个状态。通常我们会用调用栈来表示这个状态——栈记载了每个调用层级执行到哪里,还有执行时的环境情况等所有有关的信息。

        “上下文切换”,表达的是一种从一个上下文切换到另一个上下文执行的技术。而“调度”指的是决定哪个上下文可以获得接下去的CPU时间的方法。

    与线程比较:

      1. python的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到io就被迫交出cpu执行权限,切换其他线程运行)

      2. 单线程内开启协程,一旦遇到io,从应用程序级别(而非操作系统)控制切换

    对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下:

      1.  协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

      2. 单线程内就可以实现并发的效果,最大限度地利用cpu

    用yield生成器函数实现单线程下保存程序的运行状态:

    import time
    
    def consumer():
        r = ''
        while True:
            n = yield r
            print('[CONSUMER] ←← Consuming %s...' % n)
            time.sleep(1)
            r = '200 OK'
    
    def produce(c):
        next(c)
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] →→ Producing %s...' % n)
            cr = c.send(n)  #  cr="200 ok"
            print('[PRODUCER] Consumer return: %s' % cr)
        c.close()
    
    if __name__=='__main__':
        c=consumer()  # c:生成器对象
        produce(c)

    3.2 greenlet类实现协程

      greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.

    用greenlet类实现协程举例:

    from greenlet import greenlet
     
    def test1():
        print (12)
        gr2.switch()
        print (34)
        gr2.switch()
     
    def test2():
        print (56)
        gr1.switch()
        print (78)
     
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    
    gr1.switch()
        
    >>:12
        56
        34
        78  

     

    3.3 基于greenlet类用 gevent模块实现协程

      Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。

    gevent是第三方库,通过greenlet实现协程,其基本思想是:

      当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

      由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

      

    用gevent模块实现爬虫

    from gevent import monkey
    monkey.patch_all()
    import requests,gevent,time
    
    def foo(url):
        respnse=requests.get(url)
        respnse_str=respnse.text
        print("GET data %s"%len(respnse_str))
    
    s=time.time()
    gevent.joinall([gevent.spawn(foo,"https://itk.org/"),
                    gevent.spawn(foo, "https://www.github.com/"),
                    gevent.spawn(foo, "https://baidu.com/")])
    
    print(time.time()-s)
    

    上例中还可以用gevent.sleep(2)来模拟gevent可以识别的i/o阻塞

    而time.sleep(2)或其他的阻塞 gevent是不能直接识别的,需要添加补丁,添加补丁代码如下:

    from gevent import monkey
    monkey.patch_all()

    补丁代码必须放在导入其他模块之前,及放在文件开头

     

    附:用进程池、多线程、协程爬虫时间比较

    from gevent import monkey
    monkey.patch_all()
    import requests
    import re
    from multiprocessing import Pool
    import time,threading
    import gevent
    
    def getpage(res):
        response_str=requests.get(res)
        print('ecdoing is :',response_str.encoding)
        return response_str.text
    
    def js(ret):
        li=[]
        for item in ret:
            dic={'title':item[2],'date':item[1],'评论数':item[0]}
            li.append(dic)
        f=open('acfun.txt','a',encoding='utf-8')
        for i in li:
            f.write(str(i))
            f.write('
    ')
        f.close()
    
    def run(n):
        url='http://www.acfun.cn/v/list73/index_%s.htm'%n
        print(url)
        response=getpage(url)
        # response=response.encode('ISO-8859-1').decode('utf-8')
        obj=re.compile('<span class="a">(d+)</span>.*?<a href=.*? target=".*?" title="发布于 (.*?)" class="title">(.*?)</a>',re.S)
        # obj = re.compile(r'<img.*?src=.(S+.jpg).*?', re.S)
        ret=obj.findall(response)
        # print(ret)
        return js(ret)
    
    
    if __name__ == '__main__':
    
        start_time=time.time()
    
        #顺序执行
        # start_time=time.time()
        # for j in range(1,100):
        #     run(j)
        # #顺序执行cost time: 51.30734419822693
    
        #多线程并发执行
        # li=[]
        # for j in range(1,100):
        #     j = threading.Thread(target=run, args=(j,))
        #     j.start()
        #     li.append(j)
        # for obj in li:
        #     obj.join()
        # 并发执行不使用join cost time: 0.20418000221252441
        # 并发执行使用join cost time: 4.524945974349976
    
        #使用进程池
        # p = Pool(5)
        # for i in range(1,100):
        #     p.apply_async(func=run,args=(i,))
        # p.close()
        # p.join()
        #使用进程池cost time: 6.876262426376343
    
        #使用协程
        li = []
        for i in range(1, 100):
            li.append(gevent.spawn(run, i))
        gevent.joinall(li)
        #使用协程第一次cost time: 4.432950973510742
        #使用协程第二次cost time: 30.864907264709473
        #使用协程第三次cost time: 13.472567558288574
    
    
        end_time=time.time()
        print('cost time:', end_time-start_time)
    使用多线程、进程池、协程爬虫时间比较

    四、I/O模型

    Linux环境下的network IO Model分为:

    •     blocking IO
    •     nonblocking IO
    •     IO multiplexing
    •     signal driven IO
    •     asynchronous IO

    由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。
    再说一下IO发生时涉及的对象和步骤。
      对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:

    •  等待数据准备 (Waiting for the data to be ready)
    •  将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

    记住这两点很重要,因为这些IO Model的区别就是在两个阶段上各有不同的情况。

    4.1 blocking IO (阻塞IO)

    在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

      当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

    blocking IO的特点:在IO执行的两个阶段都被block了,全程阻塞

    4.2 non-blocking IO(非阻塞IO)

    linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

      从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问kernel数据好了没有。

    优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在同时执行)。

    缺点:任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

    4.3 IO multiplexing(IO多路复用)

       IO multiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

     

      当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
      这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection

      (所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
      在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

    结论: select的优势在于可以处理多个连接,不适用于单个连接 

     4.4 Asynchronous I/O(异步IO)

     linux下的asynchronous IO其实用得很少。先看一下它的流程: 

     

      用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

    4.5 IO模型比较分析

     各个IO Model的比较如图所示:

     

    4.6 selectors模块

    import selectors
    import socket
    
    sel = selectors.DefaultSelector()
    
    def accept(sock, mask):
        conn, addr = sock.accept()  # Should be ready
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        sel.register(conn, selectors.EVENT_READ, read)
    
    def read(conn, mask):
        data = conn.recv(1000)  # Should be ready
        if data:
            print('echoing', repr(data), 'to', conn)
            conn.send(data)  # Hope it won't block
        else:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
    
    sock = socket.socket()
    sock.bind(('localhost', 1234))
    sock.listen(100)
    sock.setblocking(False)
    sel.register(sock, selectors.EVENT_READ, accept)
    
    while True:
        events = sel.select()
        for key, mask in events:
            callback = key.data
            callback(key.fileobj, mask)
    

      

  • 相关阅读:
    法正(44):鄙视
    法正(43):许攸
    法正(42):孔明
    法正(41):帮派
    法正(40):常委
    法正(39):寡妇
    法正(38):吴苋
    法正(37):刘备
    法正(29):暗弱
    法正(36):花瓶
  • 原文地址:https://www.cnblogs.com/hedeyong/p/7214345.html
Copyright © 2011-2022 走看看