zoukankan      html  css  js  c++  java
  • 初学Python——协程

    进程、线程和协程区分

     我们通常所说的协程Coroutine其实是corporate routine的缩写,直接翻译为协同的例程,一般我们都简称为协程。

     在linux系统中,线程就是轻量级的进程,而我们通常也把协程称为轻量级的线程即微线程。协程的本质是单线程,CPU不知道协程的存在,协程机制是在上层做的封装。

     不管进程、线程还是协程,它们有一个共同的目的就是提高CPU的利用率,使得CPU尽可能少地空转。

    进程和协程:

    相同点:

    相同点存在于,当我们挂起一个执行流的时,我们要保存的东西:

    • 栈, 其实在你切换前你的局部变量,以及要函数的调用都需要保存,否则都无法恢复
    • 寄存器状态,这个其实用于当你的执行流恢复后要做什么

    而寄存器和栈的结合就可以理解为上下文,上下文切换的理解:
    CPU看上去像是在并发的执行多个进程,这是通过处理器在进程之间切换来实现的,操作系统实现这种交错执行的机制称为上下文切换

    操作系统保持跟踪进程运行所需的所有状态信息。这种状态,就是上下文。
    在任何一个时刻,操作系统都只能执行一个进程代码,当操作系统决定把控制权从当前进程转移到某个新进程时,就会进行上下文切换,即保存当前进程的上下文,恢复新进程的上下文,然后将控制权传递到新进程,新进程就会从它上次停止的地方开始。

    不同点:

    • 执行流的调度者不同,进程是内核调度,而协程是在用户态调度,也就是说进程的上下文是在内核态保存恢复的,而协程是在用户态保存恢复的,很显然用户态的代价更低
    • 进程会被强占,而协程不会,也就是说协程如果不主动让出CPU,那么其他的协程,就没有执行的机会。
    • 对内存的占用不同,实际上协程可以只需要4K的栈就足够了,而进程占用的内存要大的多
    • 从操作系统的角度讲,多协程的程序是单进程,单协程

    线程和协程:

    既然我们上面也说了,协程也被称为微线程,下面对比一下协程和线程:

    • 线程之间需要上下文切换成本相对协程来说是比较高的,尤其在开启线程较多时,但协程的切换成本非常低。
    • 同样的线程的切换更多的是靠操作系统来控制,而协程的执行由我们自己控制。

      协程只是在单一的线程里不同的协程之间切换,其实和线程很像,线程是在一个进程下,不同的线程之间做切换,这也可能是协程称为微线程的原因吧。

    协程的优缺点:

    协程的好处:

    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销方便切换控制流,简化编程模型(所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。)
    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

    缺点:

    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    协程的标准操作模块:

    • greenlet
    • gevent

    1.  greenlet协程

      此模块可以实现协程,但是需要手动切换,因此用的不多。

    import greenlet
    "greenlet可进行手动切换"
    
    
    def test1():
        print(12)
        gr2.switch() # 启动gr2
        print(34)
        gr2.switch() # 切换回gr2
    
    
    def test2():
        print(56)
        gr1.switch() # 切换gr1
        print(78)
    
    
    gr1 = greenlet.greenlet(test1)  # 启动一个协程
    gr2 = greenlet.greenlet(test2)
    gr1.switch() # 启动/切换
    greenlet协程

    2.  gevent协程

      Gevent是一种基于协程的Python网络库,它用到Greenlet提供的,封装了libevent事件循环的高层同步API。它让开发者在不改变编程习惯的同时,用同步的方式写异步I/O的代码。

    import gevent
    
    def func1():
        print('33[31;1m李闯在跟海涛搞...33[0m')
        gevent.sleep(0)
        print('33[31;1m李闯又回去跟继续跟海涛搞...33[0m')
    
    
    def func2():
        print('33[32;1m李闯切换到了跟海龙搞...33[0m')
        gevent.sleep(0)
        print('33[32;1m李闯搞完了海涛,回来继续跟海龙搞...33[0m')
    
    def func3():
        print("你好,切换到了func3")
        gevent.sleep(0)
        print("func3执行完毕")
    
    def func4():
        print("切换到func4")
        gevent.sleep(0)
        print("func4执行完毕")
    
    gevent.joinall([
        gevent.spawn(func1), # 发起一个协程
        gevent.spawn(func2),
        gevent.spawn(func3),
        gevent.spawn(func4)
    ])
    gevent协程示例

      当协程遇到阻塞时,自动切换到其他不阻塞的协程。什么叫阻塞呢?使用sleep()函数对于CPU来说处于空闲状态,所以称为阻塞,IO操作时对于CPU也是阻塞。

      通过改变上面代码中三个函数的阻塞时间可以更好地观察这个过程。

    猴子补丁 Monkey patching

    这个补丁是Gevent模块最需要注意的问题,有了它,才会让Gevent模块发挥它的作用。简单来说就是,gevent并不认识其它模块的IO操作,所以无法进行自动切换,使用猴子补丁后,会把当前所有IO操作都单独做上标记,gevent协程在有阻塞的地方做自动切换了。官网上还是建议使用”patch_all()”,而且在程序的第一行就执行。 

    from gevent import monkey;monkey.patch_all()
    "Python中的分号表示分隔,两条语句出现在同一行时必须要加分号"
    import gevent
    from urllib.request import urlopen
    import time
    
    def fcun(url):
        print('GET: %s' % url)
        resp = urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    a=["https://www.python.org/","https://www.yahoo.com/","https://github.com/"]
    start_time = time.time()
    # for i in a:
    #     fcun(i)
    
    gevent.joinall([
        gevent.spawn(fcun, 'https://www.python.org/'),
        gevent.spawn(fcun, 'https://www.yahoo.com/'),
        gevent.spawn(fcun, 'https://github.com/'),
    ])
    
    end_time = time.time()
    print("用时:",end_time-start_time)
    
    '''
    经过测试运行时间发现,多协程的执行效率要比串行执行要快很多
    '''
    gevent协程示例2

    协程间通信

    1.事件(Event)对象

      greenlet协程间的异步通讯可以使用事件(Event)对象。该对象的”wait()”方法可以阻塞当前协程,而”set()”方法可以唤醒之前阻塞的协程。在下面的例子中,5个waiter协程都会等待事件evt,当setter协程在3秒后设置evt事件,所有的waiter协程即被唤醒。

    import gevent
    from gevent.event import Event
      
    evt = Event()
      
    def setter():
        print 'Wait for me'
        gevent.sleep(3)  # 3秒后唤醒所有在evt上等待的协程
        print "Ok, I'm done"
        evt.set()  # 唤醒
      
    def waiter():
        print "I'll wait for you"
        evt.wait()  # 等待
        print 'Finish waiting'
      
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter)
    ])
    View Code

    2.AsyncResult事件

    除了Event事件外,gevent还提供了AsyncResult事件,它可以在唤醒时传递消息。让我们将上例中的setter和waiter作如下改动:  

    from gevent.event import AsyncResult
    aevt = AsyncResult()
      
    def setter():
        print 'Wait for me'
        gevent.sleep(3)  # 3秒后唤醒所有在evt上等待的协程
        print "Ok, I'm done"
        aevt.set('Hello!')  # 唤醒,并传递消息
      
    def waiter():
        print("I'll wait for you")
        message = aevt.get()  # 等待,并在唤醒时获取消息
        print 'Got wake up message: %s' % message
    View Code

    3. 队列 Queue

    队列Queue的概念相信大家都知道,我们可以用它的put和get方法来存取队列中的元素。gevent的队列对象可以让greenlet协程之间安全的访问。运行下面的程序,你会看到3个消费者会分别消费队列中的产品,且消费过的产品不会被另一个消费者再取到:

    import gevent
    from gevent.queue import Queue
     
    products = Queue()
     
    def consumer(name):
        #while not products.empty():
        while True:
            try:
                print('%s got product %s' % (name, products.get_nowait()))
                gevent.sleep(0)
            except gevent.queue.Empty:
                break
     
        print('Quit')
     
     
    def producer():
        for i in range(1, 10):
            products.put(i)
     
    gevent.joinall([
        gevent.spawn(producer),
        gevent.spawn(consumer, 'steve'),
        gevent.spawn(consumer, 'john'),
        gevent.spawn(consumer, 'nancy'),
    ])
    View Code

       注意:协程队列跟线程队列是一样的,put和get方法都是阻塞式的,它们都有非阻塞的版本:put_nowait和get_nowait。如果调用get方法时队列为空,则是不会抛出”gevent.queue.Empty”异常。我们只能使用get_nowait()的方式让气抛出异常。

    4.信号量

      信号量可以用来限制协程并发的个数。它有两个方法,acquire和release。顾名思义,acquire就是获取信号量,而release就是释放。当所有信号量都已被获取,那剩余的协程就只能等待任一协程释放信号量后才能得以运行:

    import gevent
    from gevent.coros import BoundedSemaphore
      
    sem = BoundedSemaphore(2)
      
    def worker(n):
        sem.acquire()
        print('Worker %i acquired semaphore' % n)
        gevent.sleep(0)
        sem.release()
        print('Worker %i released semaphore' % n)
      
    gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])
    View Code

    上面的例子中,我们初始化了”BoundedSemaphore”信号量,并将其个数定为2。所以同一个时间,只能有两个worker协程被调度。程序运行后的结果如下:

    Worker 0 acquired semaphore
    Worker 1 acquired semaphore
    Worker 0 released semaphore
    Worker 1 released semaphore
    Worker 2 acquired semaphore
    Worker 3 acquired semaphore
    Worker 2 released semaphore
    Worker 3 released semaphore
    Worker 4 acquired semaphore
    Worker 4 released semaphore
    Worker 5 acquired semaphore
    Worker 5 released semaphore
    View Code

     如果信号量个数为1,那就等同于同步锁。

    协程本地变量

    同线程类似,协程也有本地变量,也就是只在当前协程内可被访问的变量:

    import gevent
    from gevent.local import local
      
    data = local()
      
    def f1():
        data.x = 1
        print data.x
      
    def f2():
        try:
            print data.x
        except AttributeError:
            print 'x is not visible'
      
    gevent.joinall([
        gevent.spawn(f1),
        gevent.spawn(f2)
    ])
    View Code

     通过将变量存放在local对象中,即可将其的作用域限制在当前协程内,当其他协程要访问该变量时,就会抛出异常。不同协程间可以有重名的本地变量,而且互相不影响。因为协程本地变量的实现,就是将其存放在以的”greenlet.getcurrent()”的返回为键值的私有的命名空间内。

     多并发socket

    服务器端:

    import socket
    import gevent
     
    from gevent import socket, monkey
     
    monkey.patch_all()
     
     
    def server(port):
        s = socket.socket()
        s.bind(('0.0.0.0', port))
        s.listen(500)
        while True:
            cli, addr = s.accept()
            gevent.spawn(handle_request, cli)
     
     
    def handle_request(conn):
        try:
            while True:
                data = conn.recv(1024)
                print("recv:", data)
                conn.send(data)
                if not data:
                    conn.shutdown(socket.SHUT_WR)
     
        except Exception as ex:
            print(ex)
        finally:
            conn.close()
     
     
    if __name__ == '__main__':
        server(8001)
    View Code

    当客户端连接上服务器端时,服务器端通过开辟一个协程与该客户端完成交互任务,同时由于使用了Gevent协程的方式,在每个客户端与服务器交互时,并不会影响到服务器端的工作。

    客户端:

    import socket
     
    HOST = 'localhost'  # The remote host
    PORT = 8001         # The same port as used by the server
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"), encoding="utf8")
        s.sendall(msg)
        data = s.recv(1024)
        # print(data)
     
        print('Received', repr(data))  #  repr 格式化输出
    s.close()
    View Code
  • 相关阅读:
    阿里云高级技术专家周晶:基于融合与协同的边缘云原生体系实践
    Spring Boot Serverless 实战系列“架构篇” 首发 | 光速入门函数计算
    基于 EMR OLAP 的开源实时数仓解决方案之 ClickHouse 事务实现
    【ClickHouse 技术系列】 在 ClickHouse 中处理实时更新
    LeetCode_Two Sum
    LeetCode_ Remove Element
    LeetCode_Same Tree
    LeetCode_Symmetric Tree
    LeetCode_Path Sum
    LeetCode_Merge Sorted Array
  • 原文地址:https://www.cnblogs.com/V587Chinese/p/9382873.html
Copyright © 2011-2022 走看看