zoukankan      html  css  js  c++  java
  • Python协程详解(二)

    上一章,我们介绍了Python的协程,并讲到用yield达到协程的效果,这一章,我们来介绍yield from的结构和作用

    我们先来对比下yield和yield from的用法

    def first_gen():
        for c in "AB":
            yield c
        for i in range(0, 3):
            yield i
    
    
    print(list(first_gen()))
    
    
    def second_gen():
        yield from "AB"
        yield from range(0, 3)
    
    
    print(list(second_gen()))
    

      

    运行结果:

    ['A', 'B', 0, 1, 2]
    ['A', 'B', 0, 1, 2]
    

      

    我们可以看到,两个方法都可以达到一样的效果,但是second_gen()方法比first_gen()方法来的简练,在second_gen()中使用yield from subgen()时,second_gen()是陷入阻塞的,真正在交互的是调用second_gen()的那一方,也就是调用方和subgen()在交互,second_gen()的阻塞状态会一直持续到subgen()调用完毕,才会接着执行yield from后续的代码

    再来看下面两个例子:

    第一个例子:

    def chain(*iterables):
        for it in iterables:
            yield from it
    
    
    s = "ABC"
    t = tuple(range(0, 3))
    print(list(chain(s, t)))
    

      

    运行结果:

    ['A', 'B', 'C', 0, 1, 2]
    

      

    第二个例子:

    from collections import Iterable
    
    
    def flatten(items, ignore_types=(str, bytes)):
        for x in items:
            if isinstance(x, Iterable) and not isinstance(x, ignore_types):
                yield from flatten(x)  # 这里递归调用,如果x是可迭代对象,继续分解
            else:
                yield x
    
    
    items = [1, 2, [3, 4, [5, 6], 7], 8]
    
    # Produces 1 2 3 4 5 6 7 8
    for x in flatten(items):
        print(x)
    
    items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
    for x in flatten(items):
        print(x)
    

      

    运行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    Dave
    Paula
    Thomas
    Lewis
    

      

    yield from item 表达式对item对象所做的第一件事,就是调用iter(item),从中获取迭代器,因此,item可以是任何一个可迭代的对象,在某些时候,yield from可以代替for循环,使得我们的代码更加的精炼,yield from是委派生成器,主要功能还是和yield一样,打开双向通道,把最外层的调用方和实际在传输值的生成器连接起来,这样,二者就可以发送和产出值

    下面一个例子,让我们用yield from计算平均值并返回统计报告,假设我们有若干男孩和女孩,我们分别拥有这些男孩和女孩的体重和身高,现在我们要分别计算男孩女孩的体重、身高的平均值

    from collections import namedtuple
    
    Result = namedtuple('Result', 'count average')
    
    
    def averager():  # <1>
        total = 0.0
        count = 0
        average = None
        while True:
            term = yield  # <2>
            if term is None:  # <3>
                break
            total += term
            count += 1
            average = total / count
        return Result(count=count, average=average)  # <4>
    
    
    def grouper(results, key):  # <5>
        while True:  # <6>
            results[key] = yield from averager()  # <7>
    
    
    def main(data):
        results = {}
        for key, values in data.items():
            group = grouper(results, key)
            next(group)  # <8>
            for value in values:
                group.send(value)
            group.send(None)  # <9>
        report(results)
    
    
    def report(results):
        for key, result in sorted(results.items()):
            group, unit = key.split(";")
            print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))
    
    
    data = {
        'girls;kg':
            [40.9, 38.5, 44.3],
        'girls;m':
            [1.6, 1.51, 1.4],
        'boys;kg':
            [39.0, 40.8],
        'boys;m':
            [1.38, 1.5],
    }
    if __name__ == '__main__':
        main(data)
    

          

    运行结果:

     2 boys  averaging 39.90kg
     2 boys  averaging 1.44m
     3 girls averaging 41.23kg
     3 girls averaging 1.50m
    

        

    <1>中的yield标明averager是一个生成器,这里我们在grouper()方法中调用yield from averager(),所以averager是一个子生成器

    <2>main()函数中的代码发送值绑定到term上

    <3>当子生成器发现客户端代码发送一个None时,跳出循环,如果不那么做,averager()方法永远不会返回我们要的统计量,也就是Result对象,我们可以通过可以关闭委托生成器,但Result对象依旧无法返回

    <4>当调用方传入一个None值,子生成器跳出循环,并返回Result对象

    <5>yield from averager()表达式拿到上一个步骤返回的Result对象,赋值给左边的变量

    <6>这个循环每次迭代时会新建一个averager生成器的实例,每个实例都是作为协程使用的生成器对象

    <7>grouper会在yield from处暂停,等待调用方和子生成器交互完毕后,子生成器返回Result对象,grouper把返回的结果绑定到result[key]上

    <8>我们依旧用next()方法激活一个委派生成器,然后我们开始不断往委派生成器中输送值

    <9>当我们将值输送完毕后,传入None方法,使得自生成器内部跳出循环,并计算Result对象返回给grouper委托生成器存储起来

    让我们用协程模拟一个出租车运营场景,系统会创建几辆出租车,每辆车都会拉客,每辆车拉客的次数不同,而每次拉客都有乘客上车和乘客下车这一动作,出租车继续四处拉客,当到达各自拉客的次数后,出租车回家。

    出租车运营程序:

    import random
    import collections
    import queue
    
    DEFAULT_NUMBER_OF_TAXIS = 3
    DEFAULT_END_TIME = 180
    SEARCH_DURATION = 5
    TRIP_DURATION = 20
    DEPARTURE_INTERVAL = 5
    
    Event = collections.namedtuple('Event', ["time", "proc", "action"])
    
    
    def taxi_process(ident, trips, start_time=0):
        time = yield Event(start_time, ident, '出库')
        for i in range(trips):  
            time = yield Event(time, ident, '开始拉客')
            time = yield Event(time, ident, '乘客下车')
        yield Event(time, ident, '车入库')
    
    
    class Simulator:
        def __init__(self, procs_map):
            self.events = queue.PriorityQueue()
            self.procs = dict(procs_map)
    
        def run(self, end_time):
            for _, proc in sorted(self.procs.items()):
                first_event = next(proc)
                self.events.put(first_event)
            sim_time = 0
            while sim_time < end_time:
                if self.events.empty():
                    print('*** 事件结束 ***')
                    break
                current_event = self.events.get()
                sim_time, proc_id, previous_action = current_event
                print('taxi:', proc_id, proc_id * '   ', current_event)
                active_proc = self.procs[proc_id]
                next_time = sim_time + compute_duration(previous_action)
                try:
                    next_event = active_proc.send(next_time)
                except StopIteration:
                    del self.procs[proc_id]
                else:
                    self.events.put(next_event)
            else:
                msg = '*** end of simulation time: {} events pending ***'
                print(msg.format(self.events.qsize()))
    
    
    def compute_duration(previous_action):
        if previous_action in ['出库', '乘客下车']:
            interval = SEARCH_DURATION
        elif previous_action == '开始拉客':
            interval = TRIP_DURATION
        elif previous_action == '车入库':
            interval = 1
        else:
            raise ValueError('Unknown previous_action: %s' % previous_action)
        return int(random.expovariate(1 / interval)) + 1
    
    
    def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS):
        taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL)
                 for i in range(num_taxis)}
        print("出租车数量:", len(taxis))
        sim = Simulator(taxis)
        sim.run(end_time)
    
    
    if __name__ == '__main__':
        main()
    

          

    执行结果:

    出租车数量: 3
    taxi: 0  Event(time=0, proc=0, action='出库')
    taxi: 1     Event(time=5, proc=1, action='出库')
    taxi: 0  Event(time=6, proc=0, action='开始拉客')
    taxi: 0  Event(time=7, proc=0, action='乘客下车')
    taxi: 0  Event(time=9, proc=0, action='开始拉客')
    taxi: 2        Event(time=10, proc=2, action='出库')
    taxi: 2        Event(time=12, proc=2, action='开始拉客')
    taxi: 1     Event(time=24, proc=1, action='开始拉客')
    taxi: 1     Event(time=26, proc=1, action='乘客下车')
    taxi: 1     Event(time=28, proc=1, action='开始拉客')
    taxi: 0  Event(time=33, proc=0, action='乘客下车')
    taxi: 0  Event(time=36, proc=0, action='车入库')
    taxi: 2        Event(time=39, proc=2, action='乘客下车')
    taxi: 1     Event(time=41, proc=1, action='乘客下车')
    taxi: 2        Event(time=41, proc=2, action='开始拉客')
    taxi: 1     Event(time=45, proc=1, action='开始拉客')
    taxi: 1     Event(time=51, proc=1, action='乘客下车')
    taxi: 1     Event(time=52, proc=1, action='开始拉客')
    taxi: 1     Event(time=55, proc=1, action='乘客下车')
    taxi: 1     Event(time=61, proc=1, action='车入库')
    taxi: 2        Event(time=67, proc=2, action='乘客下车')
    taxi: 2        Event(time=70, proc=2, action='开始拉客')
    taxi: 2        Event(time=75, proc=2, action='乘客下车')
    taxi: 2        Event(time=80, proc=2, action='开始拉客')
    taxi: 2        Event(time=87, proc=2, action='乘客下车')
    taxi: 2        Event(time=90, proc=2, action='开始拉客')
    taxi: 2        Event(time=96, proc=2, action='乘客下车')
    taxi: 2        Event(time=97, proc=2, action='开始拉客')
    taxi: 2        Event(time=98, proc=2, action='乘客下车')
    taxi: 2        Event(time=106, proc=2, action='车入库')
    *** 事件结束 ***
    

          

    利用协程,我们可以看到每辆出租车各自的出库、拉客、乘客下车、再拉客,直到车入库虽然是顺序执行,但车与车之间的行为却是并行,这里我们分析一下主要的代码块

    首先是我们的taxi_process()方法:

    def taxi_process(ident, trips, start_time=0):
        time = yield Event(start_time, ident, '出库')
        for i in range(trips):  # <1>
            time = yield Event(time, ident, '开始拉客')
            time = yield Event(time, ident, '乘客下车')
        yield Event(time, ident, '车入库')
    

      

    这个方法会返回一个协程,这个协程控制着车的出库时间,循环拉客的次数以及入库时间,<1>处的trips就代表这辆车可以拉几次乘客

    再来是Simulator模块

    class Simulator:
        def __init__(self, procs_map):
            self.events = queue.PriorityQueue()  # <1>
            self.procs = dict(procs_map)  
    
        def run(self, end_time):
            for _, proc in sorted(self.procs.items()):
                first_event = next(proc)  # <2>
                self.events.put(first_event) 
            sim_time = 0
            while sim_time < end_time:
                if self.events.empty():  
                    print('*** 事件结束 ***')
                    break
                current_event = self.events.get()  
                sim_time, proc_id, previous_action = current_event  # <3>
                print('taxi:', proc_id, proc_id * '   ', current_event)
                active_proc = self.procs[proc_id]
                next_time = sim_time + compute_duration(previous_action) 
                try:
                    next_event = active_proc.send(next_time) 
                except StopIteration:
                    del self.procs[proc_id] 
                else:
                    self.events.put(next_event)  # <4>
            else:
                msg = '*** end of simulation time: {} events pending ***'
                print(msg.format(self.events.qsize()))  # <5>
    

      

    我们在<1>处生成一个队列,这个队列是后续协程在模拟出租车运营时需要用到的,之后我们copy一个字典,这个字典key是每辆车的id,value是每辆车的生成器

    在<2>处,我们激活每辆车的生成器,把每辆车生成器返回的初始状态存入事先准备好的队列中

    在<3>处,我们会从队列循环取出每辆车当前的状态,并从先前copy好的字典里取出每辆车的生成器,计算该车下一次行动的时间,并传入给生成器,如果生成器抛出StopIteration异常,则代表该生成器已结束,则在异常捕捉处从字典里删去该车的记录

    在我们往生成器传入值的时候,如果生成器没有报错,则会返回车的下一个状态,而我们把最新的状态存入队列中,如果循环,一直到队列为空为止,则跳出循环,又或者我们设定的初始时间大于终止时间,则跳出循环,进入到<5>打印队列中还有多少个事件

  • 相关阅读:
    C++中几个值得分析的小问题(1)
    《Effective C++》第5章 实现-读书笔记
    《Effective C++》第4章 设计与声明(2)-读书笔记
    《TCP/IP详解卷1:协议》第17、18章 TCP:传输控制协议(2)-读书笔记
    《Effective C++》第4章 设计与声明(1)-读书笔记
    《Effective C++》第3章 资源管理(2)-读书笔记
    【剑指Offer】24二叉树中和为某一值的路径
    【剑指Offer】23二叉搜索树的后序遍历序列
    【剑指Offer】22从上往下打印二叉树
    【剑指Offer】21栈的压入、弹出序列
  • 原文地址:https://www.cnblogs.com/beiluowuzheng/p/9066047.html
Copyright © 2011-2022 走看看