zoukankan      html  css  js  c++  java
  • 再议Python协程——从yield到asyncio

    协程,英文名Coroutine。
    前面介绍Python的多线程,以及用多线程实现并发(参见这篇文章【浅析Python多线程】),今天介绍的协程也是常用的并发手段。本篇主要内容包含:协程的基本概念、协程库的实现原理以及Python中常见的协程库。

    1 协程的基本概念

    我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。假设现在启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每个线程每隔5ms就让出CPU,让其他线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?

    如果我们能够自行调度自己写的程序,让一些代码块遇到IO操作时,切换去执行另外一些需要CPU操作的代码块,是不是节约了很多无畏的上下文切换呢?是的,协程就是针对这一情况而生的。我们把写好的一个应用程序分为很多个代码块,如下图所示:

    把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程(通常是遇到IO操作时切换才有意义)。示意图如下:

    所以,关于协程可以总结以下两点:

    (1)线程的调度是由操作系统负责协程调度是程序自行负责

    (2)与线程相比,协程减少了无畏的操作系统切换

    实际上当遇到IO操作时做切换才更有意义,(因为IO操作不用占用CPU),如果没遇到IO操作,按照时间片切换,无意义。

    举个例子,你在做一顿饭你要蒸饭和炒菜:最笨的方法是先蒸饭,饭蒸好了再去炒菜。这样一顿饭得花不少时间,就跟我们没采用并发编程一样。

    多线程相当于,你5分钟在做蒸饭的工作,到了5分钟开始炒菜,又过了5分钟,你又去忙蒸饭。

    协程相当于,你淘完米,放在电饭锅,按下煮饭键之后,你开始去炒菜。炒菜的时候油没热,你可以调佐料。这样,你炒两个菜出来,饭蒸好了。整个过程你没闲着,但是节约了不少时间。

    2 基于yield实现协程

    如1中所述,代码块A能够中断去执行代码块B,代码块B能够中断,执行代码块A。这不是和yield功能如出一辙吗?我们先回忆一下yield的功能:

    (1) 在函数中,语句执行到yield,会返回yield 后面的内容;当再回来执行时,从yield的下一句开始执行;
    (2) 使用yield语法的函数是一个生成器;
    (3) python3中,通过 .__next__() 或者 next() 方法获取生成器的下一个值。

    来看一个yield实现协程的例子:

    from collections import deque
    
    def sayHello(n):
        while n > 0:
            print("hello~", n)
            yield n
            n -= 1
        print('say hello')
    
    def sayHi(n):
        x = 0
        while x < n:
            print('hi~', x)
            yield
            x += 1
        print("say hi")
    
    # 使用yield语句,实现简单任务调度器
    class TaskScheduler(object):
        def __init__(self):
            self._task_queue = deque()
    
        def new_task(self, task):
            '''
            向调度队列添加新的任务
            '''
            self._task_queue.append(task)
    
        def run(self):
            '''
            不断运行,直到队列中没有任务
            '''
            while self._task_queue:
                task = self._task_queue.popleft()
                try:
                    next(task)
                    self._task_queue.append(task)
                except StopIteration:
                    # 生成器结束
                    pass
    
    sched = TaskScheduler()
    sched.new_task(sayHello(10))
    sched.new_task(sayHi(15))
    sched.run() 

    上例执行时,你会看到sayHello()和sayHi() 不断交替执行,当执行sayHello()时,在yield处中断,当执行sayHi()时从yield处中断,切换回sayHello()从yield之后的一句开始执行。。。如此来回交替无缝连接。

    3 基于yield实现actor模型

    actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面我们通过yield来实现:

    from collections import deque
    
    class ActorScheduler:
        def __init__(self):
            self._actors = {}
            self._msg_queue = deque()
    
        def new_actor(self, name, actor):
            self._msg_queue.append((actor, None))
            self._actors[name] = actor
    
        def send(self, name, msg):
            actor = self._actors.get(name)
            if actor:
                self._msg_queue.append((actor, msg))
    
        def run(self):
            while self._msg_queue:
                # print("队列:", self._msg_queue)
                actor, msg = self._msg_queue.popleft()
                # print("actor", actor)
                # print("msg", msg)
                try:
                     actor.send(msg)
                except StopIteration:
                     pass
    
    
    if __name__ == '__main__':
        def say_hello():
            while True:
                msg = yield
                print("say hello", msg)
    
        def say_hi():
            while True:
                msg = yield
                print("say hi", msg)
    
        def counter(sched):
            while True:
                n = yield
                print("counter:", n)
                if n == 0:
                    break
                sched.send('say_hello', n)
                sched.send('say_hi', n)
                sched.send('counter', n-1)
    
        sched = ActorScheduler()
        # 创建初始化 actors
        sched.new_actor('say_hello', say_hello())
        sched.new_actor('say_hi', say_hi())
        sched.new_actor('counter', counter(sched))
    
        sched.send('counter', 10)
        sched.run()

    上例中:

    (1) ActorScheduler 负责事件循环
    (2) counter() 负责控制终止
    (3) say_hello() / say_hi() 相当于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。

    所以,当执行时,我们能够看到say_hello() / say_hi()不断交替切换执行,直到counter满足终止条件之后,协程终止。看懂上例可能需要花费一些时间。实际上我们已经实现了一个“操作系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

    4 协程库的实现及asyncio

    有了前面对协程的了解,我们可以思考怎样去实现一个协程库?我觉得可以从以下两个个方面去思考:

    (1)事件循环 (event loop)。事件循环需要实现两个功能,一是顺序执行协程代码;二是完成协程的调度,即一个协程“暂停”时,决定接下来执行哪个协程。

    (2)协程上下文的切换。基本上Python 生成器的 yeild 已经能完成切换,Python3中还有特定语法支持协程切换。

    我们看一个比较复杂的例子:

    from collections import deque
    from select import select
    
    class YieldEvent:
        def handle_yield(self, sched, task):
            pass
    
        def handle_resume(self, sched, task):
            pass
    
    # 任务调度(相当于EventLoop)
    class Scheduler:
        def __init__(self):
            self._numtasks = 0         # 任务总数量
            self._ready = deque()      # 等待执行的任务队列
            self._read_waiting = {}    # 正等待读的任务
            self._write_waiting = {}   # 正等待写的任务
    
        # 利用I/O多路复用 监听读写I/0
        def _iopoll(self):
            rset, wset, eset = select(self._read_waiting,
                                      self._write_waiting, [])
            for r in rset:
                evt, task = self._read_waiting.pop(r)
                evt.handle_resume(self, task)
            for w in wset:
                evt, task = self._write_waiting.pop(w)
                evt.handle_resume(self, task)
    
        def new(self, task):
            """添加一个新的任务"""
            self._ready.append((task, None))
            self._numtasks += 1
    
        def add_ready(self, task, msg=None):
            """添加到任务对列等待执行"""
            self._ready.append((task, msg))
    
        def _read_wait(self, fileno, evt, task):
            self._read_waiting[fileno] = (evt, task)
    
        def _write_wait(self, fileno, evt, task):
            self._write_waiting[fileno] = (evt, task)
    
        def run(self):
            while self._numtasks:
                # 如果任务数量为空,阻塞在select处,保持监听
                if not self._ready:
                    self._iopoll()
                task, msg = self._ready.popleft()
                try:
                    r = task.send(msg)
                    if isinstance(r, YieldEvent):
                        r.handle_yield(self, task)
                    else:
                        raise RuntimeError('unrecognized yield event')
                except StopIteration:
                    self._numtasks -= 1
    
    # 示例: 将协程抽象成YieldEvent的子类,并重写handle_yield和handle_resume方法
    class ReadSocket(YieldEvent):
        def __init__(self, sock, nbytes):
            self.sock = sock
            self.nbytes = nbytes
    
        def handle_yield(self, sched, task):
            sched._read_wait(self.sock.fileno(), self, task)
    
        def handle_resume(self, sched, task):
            data = self.sock.recv(self.nbytes)
            sched.add_ready(task, data)
    
    class WriteSocket(YieldEvent):
        def __init__(self, sock, data):
            self.sock = sock
            self.data = data
    
        def handle_yield(self, sched, task):
            sched._write_wait(self.sock.fileno(), self, task)
    
        def handle_resume(self, sched, task):
            nsent = self.sock.send(self.data)
            sched.add_ready(task, nsent)
    
    class AcceptSocket(YieldEvent):
        def __init__(self, sock):
            self.sock = sock
    
        def handle_yield(self, sched, task):
            sched._read_wait(self.sock.fileno(), self, task)
    
        def handle_resume(self, sched, task):
            r = self.sock.accept()
            sched.add_ready(task, r)
    
    
    class Socket(object):
        def __init__(self, sock):
            self._sock = sock
    
        def recv(self, maxbytes):
            return ReadSocket(self._sock, maxbytes)
    
        def send(self, data):
            return WriteSocket(self._sock, data)
    
        def accept(self):
            return AcceptSocket(self._sock)
    
        def __getattr__(self, name):
            return getattr(self._sock, name)
    
    if __name__ == '__main__':
        from socket import socket, AF_INET, SOCK_STREAM
    
        def readline(sock):
            chars = []
            while True:
                c = yield sock.recv(1)
                print(c)
                if not c:
                    break
                chars.append(c)
                if c == b'
    ':
                    break
            return b''.join(chars)
    
        # socket server 使用生成器
        class EchoServer:
            def __init__(self, addr, sched):
                self.sched = sched
                sched.new(self.server_loop(addr))
    
            def server_loop(self, addr):
                s = Socket(socket(AF_INET, SOCK_STREAM))
                s.bind(addr)
                s.listen(5)
                while True:
                    c, a = yield s.accept()
                    print('Got connection from ', a)
                    print("got", c)
                    self.sched.new(self.client_handler(Socket(c)))
    
            def client_handler(self, client):
                while True:
                    try:
                        line = yield from readline(client)
                        if not line:
                            break
    
                        print("from Client::", str(line))
                    except Exception:
                        break
    
                    while line:
                        try:
                            nsent = yield client.sendall(line)
                            print("nsent", nsent)
                            line = line[nsent:]
                        except Exception:
                            break
                client.close()
                print('Client closed')
    
        sched = Scheduler()
        EchoServer(('localhost', 9999), sched)
        sched.run()  

    Scheduler相当于实现事件循环并调度协程, 添加到事件循环中的事件必须继承YieldEvent, 并重写它定义的两个方法。此例比较难,看不懂可以忽略。

    我们看一下Python3中的协程库asyncio是怎么实现的:

    import asyncio
    
    @asyncio.coroutine
    def say_hi(n):
        print("start:", n)
        r = yield from asyncio.sleep(2)
        print("end:", n)
    
    loop = asyncio.get_event_loop()
    tasks = [say_hi(0), say_hi(1)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    # start: 1
    # start: 0
    # 停顿两秒
    # end: 1
    # end: 0

    (1)@asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。
    (2)yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。
    (3)asyncio.sleep(1)相当于一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。

    asyncio中get_event_loop()就是事件循环,而装饰器@asyncio.coroutine标记了一个协程,并yield from 语法实现协程切换。在Python3.5中,新增了asyncawait的新语法,代替装饰器和yield from。上例可以用新增语法完全代替。

    async def say_hi(n):
        print("start:", n)
        r = await asyncio.sleep(2)
        print("end:", n)
    
    loop = asyncio.get_event_loop()
    tasks = [say_hi(0), say_hi(1)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    # start: 1
    # start: 0
    # 停顿两秒
    # end: 1
    # end: 0

    @asyncio.coroutine换成async, 将yield from 换成await  即可。

    5 协程的缺点

    (1)使用协程,只能使用单线程,多线程的便利就一点都用不到。例如,I/O阻塞程序,CPU仍然会将整个任务挂起直到操作完成。
    (2) 一旦使用协程,大部分ython库并不能很好的兼容,这就会导致要改写大量的标准库函数。
    所以,最好别用协程,一旦用不好,协程给程序性能带来的提升,远远弥补不了其带来的灾难。

      

     

      

     

  • 相关阅读:
    数据结构与算法(一)--数组
    Lucene学习
    java虚拟机面试题(JVM)
    Java开发面试题归类( 题目篇)
    java虚拟机学习(六)
    java虚拟机学习(五)--垃圾收集器总结
    21_异常_第21天(异常、企业面试题,思维导图下载)
    20_集合_第20天(Map、可变参数、Collections)
    19_集合_第19天(List、Set)
    18_集合框架_第18天(集合、Iterator迭代器、增强for循环 、泛型)
  • 原文地址:https://www.cnblogs.com/zingp/p/8678109.html
Copyright © 2011-2022 走看看