上一章,我们介绍了Python的协程,并讲到用yield达到协程的效果,这一章,我们来介绍yield from的结构和作用
我们先来对比下yield和yield from的用法
def first_gen(): for c in "AB": yield c for i in range(0, 3): yield i print(list(first_gen())) def second_gen(): yield from "AB" yield from range(0, 3) print(list(second_gen()))
运行结果:
['A', 'B', 0, 1, 2] ['A', 'B', 0, 1, 2]
我们可以看到,两个方法都可以达到一样的效果,但是second_gen()方法比first_gen()方法来的简练,在second_gen()中使用yield from subgen()时,second_gen()是陷入阻塞的,真正在交互的是调用second_gen()的那一方,也就是调用方和subgen()在交互,second_gen()的阻塞状态会一直持续到subgen()调用完毕,才会接着执行yield from后续的代码
再来看下面两个例子:
第一个例子:
def chain(*iterables): for it in iterables: yield from it s = "ABC" t = tuple(range(0, 3)) print(list(chain(s, t)))
运行结果:
['A', 'B', 'C', 0, 1, 2]
第二个例子:
from collections import Iterable def flatten(items, ignore_types=(str, bytes)): for x in items: if isinstance(x, Iterable) and not isinstance(x, ignore_types): yield from flatten(x) # 这里递归调用,如果x是可迭代对象,继续分解 else: yield x items = [1, 2, [3, 4, [5, 6], 7], 8] # Produces 1 2 3 4 5 6 7 8 for x in flatten(items): print(x) items = ['Dave', 'Paula', ['Thomas', 'Lewis']] for x in flatten(items): print(x)
运行结果:
1 2 3 4 5 6 7 8 Dave Paula Thomas Lewis
yield from item 表达式对item对象所做的第一件事,就是调用iter(item),从中获取迭代器,因此,item可以是任何一个可迭代的对象,在某些时候,yield from可以代替for循环,使得我们的代码更加的精炼,yield from是委派生成器,主要功能还是和yield一样,打开双向通道,把最外层的调用方和实际在传输值的生成器连接起来,这样,二者就可以发送和产出值
下面一个例子,让我们用yield from计算平均值并返回统计报告,假设我们有若干男孩和女孩,我们分别拥有这些男孩和女孩的体重和身高,现在我们要分别计算男孩女孩的体重、身高的平均值
from collections import namedtuple Result = namedtuple('Result', 'count average') def averager(): # <1> total = 0.0 count = 0 average = None while True: term = yield # <2> if term is None: # <3> break total += term count += 1 average = total / count return Result(count=count, average=average) # <4> def grouper(results, key): # <5> while True: # <6> results[key] = yield from averager() # <7> def main(data): results = {} for key, values in data.items(): group = grouper(results, key) next(group) # <8> for value in values: group.send(value) group.send(None) # <9> report(results) def report(results): for key, result in sorted(results.items()): group, unit = key.split(";") print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit)) data = { 'girls;kg': [40.9, 38.5, 44.3], 'girls;m': [1.6, 1.51, 1.4], 'boys;kg': [39.0, 40.8], 'boys;m': [1.38, 1.5], } if __name__ == '__main__': main(data)
运行结果:
2 boys averaging 39.90kg 2 boys averaging 1.44m 3 girls averaging 41.23kg 3 girls averaging 1.50m
<1>中的yield标明averager是一个生成器,这里我们在grouper()方法中调用yield from averager(),所以averager是一个子生成器
<2>main()函数中的代码发送值绑定到term上
<3>当子生成器发现客户端代码发送一个None时,跳出循环,如果不那么做,averager()方法永远不会返回我们要的统计量,也就是Result对象,我们可以通过可以关闭委托生成器,但Result对象依旧无法返回
<4>当调用方传入一个None值,子生成器跳出循环,并返回Result对象
<5>yield from averager()表达式拿到上一个步骤返回的Result对象,赋值给左边的变量
<6>这个循环每次迭代时会新建一个averager生成器的实例,每个实例都是作为协程使用的生成器对象
<7>grouper会在yield from处暂停,等待调用方和子生成器交互完毕后,子生成器返回Result对象,grouper把返回的结果绑定到result[key]上
<8>我们依旧用next()方法激活一个委派生成器,然后我们开始不断往委派生成器中输送值
<9>当我们将值输送完毕后,传入None方法,使得自生成器内部跳出循环,并计算Result对象返回给grouper委托生成器存储起来
让我们用协程模拟一个出租车运营场景,系统会创建几辆出租车,每辆车都会拉客,每辆车拉客的次数不同,而每次拉客都有乘客上车和乘客下车这一动作,出租车继续四处拉客,当到达各自拉客的次数后,出租车回家。
出租车运营程序:
import random import collections import queue DEFAULT_NUMBER_OF_TAXIS = 3 DEFAULT_END_TIME = 180 SEARCH_DURATION = 5 TRIP_DURATION = 20 DEPARTURE_INTERVAL = 5 Event = collections.namedtuple('Event', ["time", "proc", "action"]) def taxi_process(ident, trips, start_time=0): time = yield Event(start_time, ident, '出库') for i in range(trips): time = yield Event(time, ident, '开始拉客') time = yield Event(time, ident, '乘客下车') yield Event(time, ident, '车入库') class Simulator: def __init__(self, procs_map): self.events = queue.PriorityQueue() self.procs = dict(procs_map) def run(self, end_time): for _, proc in sorted(self.procs.items()): first_event = next(proc) self.events.put(first_event) sim_time = 0 while sim_time < end_time: if self.events.empty(): print('*** 事件结束 ***') break current_event = self.events.get() sim_time, proc_id, previous_action = current_event print('taxi:', proc_id, proc_id * ' ', current_event) active_proc = self.procs[proc_id] next_time = sim_time + compute_duration(previous_action) try: next_event = active_proc.send(next_time) except StopIteration: del self.procs[proc_id] else: self.events.put(next_event) else: msg = '*** end of simulation time: {} events pending ***' print(msg.format(self.events.qsize())) def compute_duration(previous_action): if previous_action in ['出库', '乘客下车']: interval = SEARCH_DURATION elif previous_action == '开始拉客': interval = TRIP_DURATION elif previous_action == '车入库': interval = 1 else: raise ValueError('Unknown previous_action: %s' % previous_action) return int(random.expovariate(1 / interval)) + 1 def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS): taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL) for i in range(num_taxis)} print("出租车数量:", len(taxis)) sim = Simulator(taxis) sim.run(end_time) if __name__ == '__main__': main()
执行结果:
出租车数量: 3 taxi: 0 Event(time=0, proc=0, action='出库') taxi: 1 Event(time=5, proc=1, action='出库') taxi: 0 Event(time=6, proc=0, action='开始拉客') taxi: 0 Event(time=7, proc=0, action='乘客下车') taxi: 0 Event(time=9, proc=0, action='开始拉客') taxi: 2 Event(time=10, proc=2, action='出库') taxi: 2 Event(time=12, proc=2, action='开始拉客') taxi: 1 Event(time=24, proc=1, action='开始拉客') taxi: 1 Event(time=26, proc=1, action='乘客下车') taxi: 1 Event(time=28, proc=1, action='开始拉客') taxi: 0 Event(time=33, proc=0, action='乘客下车') taxi: 0 Event(time=36, proc=0, action='车入库') taxi: 2 Event(time=39, proc=2, action='乘客下车') taxi: 1 Event(time=41, proc=1, action='乘客下车') taxi: 2 Event(time=41, proc=2, action='开始拉客') taxi: 1 Event(time=45, proc=1, action='开始拉客') taxi: 1 Event(time=51, proc=1, action='乘客下车') taxi: 1 Event(time=52, proc=1, action='开始拉客') taxi: 1 Event(time=55, proc=1, action='乘客下车') taxi: 1 Event(time=61, proc=1, action='车入库') taxi: 2 Event(time=67, proc=2, action='乘客下车') taxi: 2 Event(time=70, proc=2, action='开始拉客') taxi: 2 Event(time=75, proc=2, action='乘客下车') taxi: 2 Event(time=80, proc=2, action='开始拉客') taxi: 2 Event(time=87, proc=2, action='乘客下车') taxi: 2 Event(time=90, proc=2, action='开始拉客') taxi: 2 Event(time=96, proc=2, action='乘客下车') taxi: 2 Event(time=97, proc=2, action='开始拉客') taxi: 2 Event(time=98, proc=2, action='乘客下车') taxi: 2 Event(time=106, proc=2, action='车入库') *** 事件结束 ***
利用协程,我们可以看到每辆出租车各自的出库、拉客、乘客下车、再拉客,直到车入库虽然是顺序执行,但车与车之间的行为却是并行,这里我们分析一下主要的代码块
首先是我们的taxi_process()方法:
def taxi_process(ident, trips, start_time=0): time = yield Event(start_time, ident, '出库') for i in range(trips): # <1> time = yield Event(time, ident, '开始拉客') time = yield Event(time, ident, '乘客下车') yield Event(time, ident, '车入库')
这个方法会返回一个协程,这个协程控制着车的出库时间,循环拉客的次数以及入库时间,<1>处的trips就代表这辆车可以拉几次乘客
再来是Simulator模块
class Simulator: def __init__(self, procs_map): self.events = queue.PriorityQueue() # <1> self.procs = dict(procs_map) def run(self, end_time): for _, proc in sorted(self.procs.items()): first_event = next(proc) # <2> self.events.put(first_event) sim_time = 0 while sim_time < end_time: if self.events.empty(): print('*** 事件结束 ***') break current_event = self.events.get() sim_time, proc_id, previous_action = current_event # <3> print('taxi:', proc_id, proc_id * ' ', current_event) active_proc = self.procs[proc_id] next_time = sim_time + compute_duration(previous_action) try: next_event = active_proc.send(next_time) except StopIteration: del self.procs[proc_id] else: self.events.put(next_event) # <4> else: msg = '*** end of simulation time: {} events pending ***' print(msg.format(self.events.qsize())) # <5>
我们在<1>处生成一个队列,这个队列是后续协程在模拟出租车运营时需要用到的,之后我们copy一个字典,这个字典key是每辆车的id,value是每辆车的生成器
在<2>处,我们激活每辆车的生成器,把每辆车生成器返回的初始状态存入事先准备好的队列中
在<3>处,我们会从队列循环取出每辆车当前的状态,并从先前copy好的字典里取出每辆车的生成器,计算该车下一次行动的时间,并传入给生成器,如果生成器抛出StopIteration异常,则代表该生成器已结束,则在异常捕捉处从字典里删去该车的记录
在我们往生成器传入值的时候,如果生成器没有报错,则会返回车的下一个状态,而我们把最新的状态存入队列中,如果循环,一直到队列为空为止,则跳出循环,又或者我们设定的初始时间大于终止时间,则跳出循环,进入到<5>打印队列中还有多少个事件