zoukankan      html  css  js  c++  java
  • 终结python协程----从yield到actor模型的实现

    把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程

    我们知道线程的调度(线程上下文切换)是由操作系统决定的,当一个线程启动后,什么时候占用CPU、什么时候让出CPU,程序员都无法干涉。假设现在启动4个线程,CPU线程时间片为 5 毫秒,也就是说,每个线程每隔5ms就让出CPU,让其他线程抢占CPU。可想而知,等4个线程运行结束,要进行多少次切换?

    如果我们能够自行调度自己写的程序,让一些代码块遇到IO操作时,切换去执行另外一些需要CPU操作的代码块,是不是节约了很多无畏的上下文切换呢?是的,协程就是针对这一情况而生的。我们把写好的一个应用程序分为很多个代码块,如下图所示:

    把应用程序的代码分为多个代码块,正常情况代码自上而下顺序执行。如果代码块A运行过程中,能够切换执行代码块B,又能够从代码块B再切换回去继续执行代码块A,这就实现了协程(通常是遇到IO操作时切换才有意义)。示意图如下:

    所以,关于协程可以总结以下两点:

    (1)线程的调度是由操作系统负责,协程调度是程序自行负责。

    (2)与线程相比,协程减少了无畏的操作系统切换。

    实际上当遇到IO操作时做切换才更有意义,(因为IO操作不用占用CPU),如果没遇到IO操作,按照时间片切换,无意义。

    python中的yield 关键字用来实现生成器,但是生成器在一定的程度上与协程其实也是差不多。我们来看个例子:

    def sayHello(n):
        while n > 0:
            print("hello~", n)
            yield n
            n -= 1
        print('say hello')
    
        
    if __name__ == "__main__":
        sayHello(5)  # 测试1
        # next(sayHello(5))  # 测试2
        
        # 测试3
        # for i in sayHello(5):
        #     pass

    挨个测试,你会发现第一个测试是不能通过的,什么都不会输出,这就是我们的生成器特性了,一旦函数内部有yield关键字,此函数就是生成器,只有调用next 或是 for之类的能够迭代的才能够使得生成器执行。那么这与我们的协程有什么关系呢?请看代码:

    from collections import deque
     
    def sayHello(n):
        while n > 0:
            print("hello~", n)
            yield n
            n -= 1
        print('say hello')
     
    def sayHi(n):
        x = 0
        while x < n:
            print('hi~', x)
            yield
            x += 1
        print("say hi")
     
    # 使用yield语句,实现简单任务调度器
    class TaskScheduler(object):
        def __init__(self):
            self._task_queue = deque()
     
        def new_task(self, task):
            '''
            向调度队列添加新的任务
            '''
            self._task_queue.append(task)
     
        def run(self):
            '''
            不断运行,直到队列中没有任务
            '''
            while self._task_queue:
                task = self._task_queue.popleft()
                try:
                    next(task)
                    self._task_queue.append(task)
                except StopIteration:
                    # 生成器结束
                    pass
    
    
    if __name__ == "__main__":
        sched = TaskScheduler()
        sched.new_task(sayHello(10))
        sched.new_task(sayHi(15))
        sched.run()

    代码运行下,你就发现了,这就是我们对协程的定义了。接下来我们说下actor模型。actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。下面我们通过yield来实现:

    from collections import deque
     
    class ActorScheduler:
        def __init__(self):
            self._actors = {}
            self._msg_queue = deque()
     
        def new_actor(self, name, actor):
            self._msg_queue.append((actor, None))
            self._actors[name] = actor
     
        def send(self, name, msg):
            actor = self._actors.get(name)
            if actor:
                self._msg_queue.append((actor, msg))
     
        def run(self):
            while self._msg_queue:
                # print("队列:", self._msg_queue)
                actor, msg = self._msg_queue.popleft()
                # print("actor", actor)
                # print("msg", msg)
                try:
                     actor.send(msg)
                except StopIteration:
                     pass
     
     
    if __name__ == '__main__':
        def say_hello():
            while True:
                msg = yield
                print("say hello", msg)
     
        def say_hi():
            while True:
                msg = yield
                print("say hi", msg)
     
        def counter(sched):
            while True:
                n = yield
                print("counter:", n)
                if n == 0:
                    break
                sched.send('say_hello', n)
                sched.send('say_hi', n)
                sched.send('counter', n-1)
     
        sched = ActorScheduler()
        # 创建初始化 actors
        sched.new_actor('say_hello', say_hello())
        sched.new_actor('say_hi', say_hi())
        sched.new_actor('counter', counter(sched))
     
        sched.send('counter', 10)
        sched.run()

    (1) ActorScheduler 负责事件循环
    (2) counter() 负责控制终止
    (3) say_hello() / say_hi() 相当于切换的协程,当程序运行到这些函数内部的yield处,就开始切换。

    所以,当执行时,我们能够看到say_hello() / say_hi()不断交替切换执行,直到counter满足终止条件之后,协程终止。看懂上例可能需要花费一些时间。实际上我们已经实现了一个“操作系统”的最小核心部分。 生成器函数(含有yield的函数)就是认为,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

    而这就是廖雪峰的python官网教程里面的协程代码的最好解释,这也是之前一直在思考的问题,请看代码:

    def consumer():
        r = ''
        while True:
            n = yield r
            if not n:
                return
            print('[CONSUMER] Consuming %s...' % n)
            r = '200 OK'
    
    def produce(c):
        c.send(None)
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] Producing %s...' % n)
            r = c.send(n)
            print('[PRODUCER] Consumer return: %s' % r)
        c.close()
    
    c = consumer()
    produce(c)

    我之前一直纳闷send()函数是如何激活生成器的,原来是实现了actor模型的协程!

    相关链接:再议Python协程——从yield到asyncio

  • 相关阅读:
    Linux YUM (Yellowdog Updater, Modified) Commands for Package Management
    awk命令例子详解
    sed命令例子详解
    浅谈XX系统跨平台迁移(测试环境)
    Postgres常用命令之增、删、改、查
    Postgres主备切换
    Postgres配置主从流复制
    Crontab定时任务
    GIL全局锁测试
    python try exception finally记录
  • 原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8717945.html
Copyright © 2011-2022 走看看