1 最简单的使用演示:
def simple_coroutine(): print('-> coroutine started') x = yield print('-> coroutine received:', x) my_coro = simple_coroutine() my_coro
Out[64]:
<generator object simple_coroutine at 0x7ffac9308048>
In [65]:
next(my_coro)
-> coroutine started
In [66]:
my_coro.send(42)
-> coroutine received: 42
--------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-66-7c96f97a77cb> in <module> ----> 1 my_coro.send(42) StopIteration:
由于simple_coroutine()是一个生成器,所以在my_coro = simple_coroutine()调用时并不执行,此时的生成器处于未激活状态
直到调用next(my_coro)时,才运行到yield部分,并且只运行到yield部分,尚未赋值给到x
调用my_coro.send(42)时,相当于调用一次next,同时将yield赋值为42,则函数打印出received: 42,且由于找不到下一个yield,生成发出异常退出
2、生成器状态:
GEN_CREATED: 创建状态
GEN_RUNNING: 运行状态
GEN_SUSPENDED: 在yield表达式处暂停状态
GEN_CLOSE:关闭状态
生成器被调用时处于创建状态,在调用next时尚未执行到yield表达式处为运行状态(激活),一旦运行到表达式处,程序挂起,此时处于暂停状态,当程序找不到下一个yield时,生成器关闭,同时抛出关闭异常StopIteration
1中创建生成器后,直接调用my_coro.send(42)时,会发生如下错误
my_coro.send(55)
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-63-8cb330c8cec3> in <module> ----> 1 my_coro.send(55) TypeError: can't send non-None value to a just-started generator
关于状态的相关代码演示:
def simple_coro2(a): print('-> started: a =', a) b = yield a print('-> Received: b =', b) c = yield a + b print('-> Recevied: c =', c) my_coro2 = simple_coro2(14) from inspect import getgeneratorstate getgeneratorstate(my_coro2)
Out[12]:
'GEN_CREATED'
In [13]:
next(my_coro2)
-> started: a = 14 dd: 14
In [14]:
getgeneratorstate(my_coro2)
Out[14]:
'GEN_SUSPENDED'
In [15]:
my_coro2.send(28)
-> Received: b = 28 dd: 42
In [10]:
my_coro2.send(99)
-> Recevied: c = 99
--------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-10-977093b924ab> in <module> ----> 1 my_coro2.send(99) StopIteration:
In [11]:
getgeneratorstate(my_coro2)
Out[11]:
'GEN_CLOSED'
从这个演示可以看到,函数被调用后处于创建状态,调用next后,达到暂停状态,至于运行状态,一旦next调用结束,运行状态也就结束了。
3 使用协程计算移动平均值
1)计算移动平均值
调用next,运行到yield右侧,由于第一次average是None,因此传递出来的是None,调用send时即为每次的计算出来的平均值
每次调用send时,都是运行到yield右侧,通过average将平均值传递出来
2 预激协程的装饰器
通过装饰器完成协程的激活过程,防止忘记调用激活函数next引发问题。
3 终止协程和异常处理
协程终止时会抛出StopIteration异常, 我们在程序中可以通过捕获此异常进行处理:
4 使用yield from
可改写成:
使用yield from 链接可迭代的对象
使用yield from 计算平均值,并输出统计报告
## 使用yield from 计算平均值,并输出统计报告 from collections import namedtuple Result = namedtuple('Result', 'count average') # 子生成器,计算出平均值 def averager(): total = 0.0 count = 0 average = None while True: term = yield # 具体的体重,身高值绑定到变量term上 if term is None: # 为空时,跳出循环,生成器终止 break total += term count += 1 average = total / count return Result(count, average) # 返回的Result会成为grouper函数中yield from 表达式的值 # 委派生成器 def grouper(results, key): while True: # 这个循环每次迭代时会新建一个averager实例,每个实例都是作为协程使用的生成器对象 results[key] = yield from averager() # grouper发送的每个值都会经由yield from处理,通过管道传给averager实例 # grouper会在yield from表达式处暂停,等待averager实例处理客户端发来的值。 # averager实例运行完毕后,返回的值绑定到results[key]上。while循环会不断 # 创建averager实例,处理更多的值 # 输出报告 def report(results): for key, result in sorted(results.items()): group, unit = key.split(';') print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit)) data = { 'girls;kg':[40.9, 38.5, 44.3, 42.2], 'girls;m':[1.6, 1.51, 1.4, 1.3], 'boys;kg':[39.0, 40.8, 43.2, 40.8], 'boys;m':[1.38, 1.5, 1.32, 1.25], } def main(data): results = {} for key, values in data.items(): group = grouper(results, key) # group是调用grouper函数得到的生成器对象,传给grouper函数的第一个参数是results, # 用于收集结果,第二个参数作为键。group作为协程使用。 next(group) # 预激group协程 for value in values: group.send(value) # 把各个value传给grouper.传入的值最终到达averager函数中的term = yield那一行。 # grouper永远不知道传入值是什么。 group.send(None) # 把None传入grouper, 导致当前的averager实例终止,也让grouper继续执行,再创建一个averager实例, # 处理下一组值 report(results) main(data)
5 离散事件仿真
离散事件仿真是一种把系统建模成一系列事件的仿真类型。在离散事件仿真中,仿真“钟”向前推进的量不是固定的,而是直接推进到下一个事件模型的模拟时间。假如我们抽象模拟出租车的运营过程,其中一个事件是乘客上车,下一个事件则是乘客下车。不管乘客坐了5分钟还是50分钟,一旦乘客下车,仿真钟就会更新,指向此次运营的结束时间。使用离散事件仿真可以在不到一秒钟的时间内模拟一年的出租车运营过程。
仿真程序taxi_sim.py会创建几辆出租车,每辆车会拉几个乘客,然后回家。出租车首先驶离车库,四处徘徊,寻找乘客;拉到乘客后,行程开始;乘客下车后,继续四处徘徊。
四处徘徊和行程所用的时间使用指数分布生在。为了让显示的信息更加整洁,时间使用取整的分钟数。每辆出租车每次的状态变化都是一个事件。
本程序将离散事件(并发)整理到一个序列中执行。
出租车每隔5分钟从车库中出发
0号出租车2分钟后拉到乘客(time=2),1号出租车3分钟后拉到乘客(time=8),2号出租车5分钟后拉到乘客(time=15)
0号出租车拉了两个乘客:第一个乘客从time=2时上车,到time=18时下车,第二个乘客从time=28时上车,到time=65时下车-----这是此次仿真中最长的行程
1号出租车拉了四个乘客,在time=110时回家
2号出租车拉了六个乘客,在time=109时回家。这辆车最后一次行程从time=97时开始,只持续了一分钟。
1号出租车的第一次 行程从time=8时开始,在这个过程中2号出租车离开了车库(time=10),而且完成了两次行程。
在此次运行示例中,所有排定的事件都在默认的仿真时间内(180分钟)完成。最后一次事件发生在time=110时。
在Event实例中,time字段是事件发生时的仿真时间,proc字段是出租车进程实例的编号,action字段是描述活动的字符串。
附源代码:taxi_sim.py
import random import collections import queue import argparse DEFAULT_NUMBER_OF_TAXIS = 3 DEFAULT_END_TIME = 180 SEARCH_DURATION = 5 TRIP_DURATION = 20 DEPARTURE_INTERVAL = 5 Event = collections.namedtuple('Event', 'time proc action') # 出租车进程。 def taxi_process(ident, trips, start_time=0): """每次状态变化时向仿真程序产出一个事件""" time = yield Event(start_time, ident, 'leave garage') for i in range(trips): time = yield Event(time, ident, 'pick up passenger') time = yield Event(time, ident, 'drop off passenger') yield Event(time, ident, 'going home') # 结束出租车进程 # 出租车仿真程序主程序。 class Simulator: def __init__(self, procs_map): self.events = queue.PriorityQueue() # 优先级队列,put方法放入数据,一般是一个数组(3, someting),get()方法数值小的优先出队 self.procs = dict(procs_map) # 创建字典的副本 def run(self, end_time): """调度并显示事件,直到事件结束""" # 调度各辆出租车的第一个事件 for _, proc in sorted(self.procs.items()): first_event = next(proc) # 第一个事件是所有车离开车库,也是为了激活子生成器 self.events.put(first_event) # 所有车的第一个事件放到优先队列中,time小的优先出来 # 此次仿真的主循环 sim_time = 0 while sim_time < end_time: if self.events.empty(): print('***事件结束***') break current_event = self.events.get() # 取出time最小的事件 sim_time, proc_id, previous_action = current_event # 元组解包 print('taxi:', proc_id, proc_id * ' ', current_event) active_proc = self.procs[proc_id] # 取出当前事件对象。是一个子生成器对象,下面要对这个对象send(time)来获得下一个yield的返回值 next_time = sim_time + comput_duration(previous_action) # 随机计算下一个时间 try: next_event = active_proc.send(next_time) # 下一个事件是子生成器执行到下一个yield的返回值 except StopIteration: # StopIteration异常说明当前子生成器执行完毕,从字典中删除它 del self.procs[proc_id] else: self.events.put(next_event) # 否则就把下一个事件放入优先队列中 else: # 如果while循环没有以break结束,那么输出结束信息 msg = '*** 仿真结束。{}个车没有回家 ***' print(msg.format(self.events.qsize())) # 仿真结束 def comput_duration(previous_action): """使用指数分布计算操作的耗时""" if previous_action in ['leave garage', 'drop off passenger']: interval = SEARCH_DURATION elif previous_action == 'pick up passenger': # 新状态是行程开始 interval = TRIP_DURATION elif previous_action == 'going home': interval = 1 else: raise ValueError('未知的活动:{}'.format(previous_action)) return int(random.expovariate(1/interval) + 1) def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS, seed=None): # 构建随机生成器,构建过程,运行仿真程序 if seed is not None: random.seed(seed) # 指定seed的值时,用这个seed可以使随机数随机出来的相等 taxis = {i: taxi_process(i, (i+1*2), i*DEPARTURE_INTERVAL) for i in range(num_taxis)} # 字典生成式,生成指定数量的子生成器对象 sim = Simulator(taxis) # 实例化仿真主循环 sim.run(end_time) # run it! if __name__ == '__main__': parser = argparse.ArgumentParser(description='出租车运行仿真') # 创建参数解析对象,添加描述 parser.add_argument('-e', '--end_time', type=int, default=DEFAULT_END_TIME) # 添加-e参数,默认值为180 parser.add_argument('-t', '--taxis', type=int, default=DEFAULT_NUMBER_OF_TAXIS, help='出租车出行数量, default=%s' %DEFAULT_NUMBER_OF_TAXIS) # 添加-t参数,用来指定出租车数量,默认值为3 parser.add_argument('-s', '--seed', type=int, default=3, help='随机生成seed') # 添加-s参数,用来设置seed值,如果seed值一样那么随机出来的结果也会一样,默认值为None args = parser.parse_args() # 这个函数用来获取参数 main(args.end_time, args.taxis, args.seed) # 通过上面函数的属性的到输入的参数,属性可以是双横线后的字符串也可以是添加参数函数的第一个不加横线的字符串
输出结果:
taxi: 0 Event(time=0, proc=0, action='leave garage') taxi: 0 Event(time=2, proc=0, action='pick up passenger') taxi: 1 Event(time=5, proc=1, action='leave garage') taxi: 1 Event(time=8, proc=1, action='pick up passenger') taxi: 2 Event(time=10, proc=2, action='leave garage') taxi: 2 Event(time=15, proc=2, action='pick up passenger') taxi: 2 Event(time=17, proc=2, action='drop off passenger') taxi: 0 Event(time=18, proc=0, action='drop off passenger') taxi: 2 Event(time=18, proc=2, action='pick up passenger') taxi: 2 Event(time=25, proc=2, action='drop off passenger') taxi: 1 Event(time=27, proc=1, action='drop off passenger') taxi: 2 Event(time=27, proc=2, action='pick up passenger') taxi: 0 Event(time=28, proc=0, action='pick up passenger') taxi: 2 Event(time=40, proc=2, action='drop off passenger') taxi: 2 Event(time=44, proc=2, action='pick up passenger') taxi: 1 Event(time=55, proc=1, action='pick up passenger') taxi: 1 Event(time=59, proc=1, action='drop off passenger') taxi: 0 Event(time=65, proc=0, action='drop off passenger') taxi: 1 Event(time=65, proc=1, action='pick up passenger') taxi: 2 Event(time=65, proc=2, action='drop off passenger') taxi: 2 Event(time=72, proc=2, action='going home') taxi: 0 Event(time=76, proc=0, action='going home') taxi: 1 Event(time=80, proc=1, action='drop off passenger') taxi: 1 Event(time=88, proc=1, action='going home') ***事件结束***