zoukankan      html  css  js  c++  java
  • 流畅的python,Fluent Python 第十六章笔记 (协程)

    顶格标注,初学者看这个一定要记住这么几个概念:

    1、next(x)与s.send(None)的一样的效果,需要给生成器send消息,必须先执行前面的命令,让生成器预激,就是走到能够互动给值的地方。

    2、yield from x   x是可迭代对象既可,因为它首相会先对x进行iter(x)返回的副本(迭代器是自身,可迭代对象是创建一个自身的迭代器返回)进行操作。

    3、send就是比next多了一个发送给生成器暂停位置值的作用,所以send一次,必须会促使迭代器向下走一次。初学者看字面会觉的是发送消息,其实如果生成器yield有值产出,也能接收。

    send这个词汇不好,应该取一个send_next更加合理。

    4、协程只需要在send的时候进行预激,另外throw,send,close,都不需要预激。

    给自己的记号:

    生成器一般仍什么错误,上浮什么错误,但扔StopIteration

    上浮RuntimeError!!!

    In [590]: i.throw(StopIteration)                                                                     
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-588-e668612914cb> in <genexpr>(.0)
    ----> 1 i= (i for i in range(10))
    
    StopIteration:
    
    The above exception was the direct cause of the following exception:
    
    RuntimeError                              Traceback (most recent call last)
    <ipython-input-590-6828ce324924> in <module>
    ----> 1 i.throw(StopIteration)
    
    RuntimeError: generator raised StopIteration
    

     

    协程是指一个过程,这个过程与调用方协作,产出由调用方提供的值。

    16.2 用作协程的生成器的基本行为

    协程可以通过inspect.getgenerationstate(...)来返回协程状态

    'GEN_CREATED'  等待开始执行

    'GEN_RUNNING'  解释器正在执行

    'GEN_SUSPENDED'  在yield表达式暂停

    'GEN_CLOSE' 执行结束

    协程在执行send(xxx)具体内容,必须预激协程,可以通过next(gen)或gen.send(None)

    In [473]: def simple_coro2(a): 
         ...:     print('-> Started: a =', a) 
         ...:     b = yield a 
         ...:     print('-> Received: b =', b) 
         ...:     c = yield a + b 
         ...:     print('-> Received: c =', c) 
         ...:                                                                                            
    
    In [474]: my_core2 = simple_coro2(14)            # 生成generator                                                    
    
    In [475]: from inspect import getgeneratorstate                                                      
    
    In [476]: getgeneratorstate(my_core2)                                                                
    Out[476]: 'GEN_CREATED'
    
    In [477]: next(my_core2)                            # 预激协程                                                 
    -> Started: a = 14
    Out[477]: 14
    
    In [478]: getgeneratorstate(my_core2)                                                                
    Out[478]: 'GEN_SUSPENDED'
    
    In [479]: my_core2.send(28)                               # 发送数据                                           
    -> Received: b = 28
    Out[479]: 42
    
    In [480]: my_core2.send(99)                                  # 发送数据                                        
    -> Received: c = 99
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-480-f1e922706109> in <module>
    ----> 1 my_core2.send(99)
    
    StopIteration: 
    
    In [481]: getgeneratorstate(my_core2)                                                                
    Out[481]: 'GEN_CLOSED'
    

     其实这种b = yield a可以理解为一个双向通道,当执行,只不过这个双向通道是分开执行的。

    当停滞在yield a的时候,已经生产出a了,这个时候你如果执行send,可以给b对象,并且生成器向下执行到下一个yield xxx产出。

    所以,如果下一个yield后面有跟对象产出,这个send就是想送一个东西进去,然后产出一个东西出来。

    16.3 示例:使用协程计算移动平均值

    In [482]: def averager(): 
         ...:     total = 0.0 
         ...:     count = 0 
         ...:     average = None 
         ...:     while True: 
         ...:         term = yield average 
         ...:         total += term 
         ...:         count += 1 
         ...:         average = total / count 
         ...:                                                                                            
    
    In [483]: coro = average_gen()                                                                       
    
    In [484]: coro.send(None)                # 预激协程                                                               
    Out[484]: 0
    
    In [485]: coro = average_gen()                                                                       
    
    In [486]: next(coro)                      # 另外一种预激协程                                                           
    Out[486]: 0
    
    In [487]: coro.send(10)                                                                              
    Out[487]: 10.0
    
    In [488]: coro.send(20)                                                                              
    Out[488]: 15.0
    
    In [489]: coro.send(30)                                                                              
    Out[489]: 20.0
    

    16.4预激协程的装饰器

    先上装饰器

    from functools import wraps
    
    def coroutine(func):
        @wraps(func)
        def primer(*args, **kwargs):
            gen = func(*args, **kwargs)
            next(gen)     # 预激协程
            return gen
        return primer
    
    from coroutil import coroutine
    
    
    @coroutine
    def averager():
        total = 0.0
        count = 0
        average = None
        while True:
            term = yield average
            total += term
            count += 1
            average = total / count
    
    
    
    if __name__ == '__main__':
        aver = averager()
        print(aver.send(10))
        print(aver.send(20))
        print(aver.send(30))
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十六章/coroaverager0.py
    10.0
    15.0
    20.0
    
    Process finished with exit code 0
    

    16.5终止协程和异常处理

    next与close不需要预激协程,throw与send需要预激协程

    from inspect import getgeneratorstate
    
    class DemoException(Exception):
        """演示用的"""
    
    def demo_exc_handling():
        print('-> coroutine started')
        count = 0
        while True:
            try:
                x = yield f'info->({count})'    # 测试定位用
            except DemoException:
                print("*** DemoException handled. Continuing...")
            else:
                print(f'-> coroutine received:{x!r}')
            count += 1
    
    
    def throw():
        exc_coro = demo_exc_handling()
        print(exc_coro.send(None))     # 预激协程
        print(exc_coro.throw(DemoException))
        print(getgeneratorstate(exc_coro))
    
    def close():
        exc_coro = demo_exc_handling()
        print(exc_coro.send(None))
        print(exc_coro.close())     # 没有返回值,不需要预激携程
        print(getgeneratorstate(exc_coro))
    
    
    if __name__ == '__main__':
        throw()
        print('='*20)
        close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十六章/coro_exc_demo.py
    -> coroutine started
    info->(0)
    *** DemoException handled. Continuing...
    info->(1)
    GEN_SUSPENDED
    ====================
    -> coroutine started
    info->(0)
    None
    GEN_CLOSED
    
    Process finished with exit code 0
    

    16.6让协程返回值

    In [492]: from collections import namedtuple 
         ...:  
         ...: Result = namedtuple('Result', 'count average') 
         ...:  
         ...:  
         ...: def averager(): 
         ...:     total = 0.0 
         ...:     count = 0 
         ...:     average = None 
         ...:     while True: 
         ...:         term = yield 
         ...:         if term is None:    # 判断输入是否为None 
         ...:             break 
         ...:         total += term 
         ...:         count += 1 
         ...:         average = total / count 
         ...:     return Result(count, average)  # 返回StopIteration 
         ...:                                                                                            
    
    In [493]: aver = averager()                                                                          
    
    In [494]: next(aver)                                                                                 
    
    In [495]: aver.send(10)                                                                              
    
    In [496]: aver.send(20)                                                                              
    
    In [497]: aver.send(None)                                                                            
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-497-19d1f05a0674> in <module>
    ----> 1 aver.send(None)
    
    StopIteration: Result(count=2, average=15.0)
    

     前面14章我自己理解关于生成器碰到return或者没有yield生成的时候,(20年2月28日补充日记,其实就是找不到下一个yield的时候)会上报StopIteration,但这个return返回的对象尽然包含在StopIteration的属性里面。

    那就可以想办法取出来了.

    In [498]: aver = averager()                                                                          
    
    In [499]: aver.send(None)                                                                            
    
    In [500]: aver.send(10)                                                                              
    
    In [501]: aver.send(20)                                                                              
    
    In [502]: try: 
         ...:     aver.send(None) 
         ...: except StopIteration as s: 
         ...:     result = s 
         ...:                                                                                            
    
    In [503]: dir(result)                                                                                
    Out[503]: 
    ['__cause__',
     '__class__',
     '__context__',
     '__delattr__',
     '__dict__',
     '__dir__',
     '__doc__',
     '__eq__',
     '__format__',
     '__ge__',
     '__getattribute__',
     '__gt__',
     '__hash__',
     '__init__',
     '__init_subclass__',
     '__le__',
     '__lt__',
     '__ne__',
     '__new__',
     '__reduce__',
     '__reduce_ex__',
     '__repr__',
     '__setattr__',
     '__setstate__',
     '__sizeof__',
     '__str__',
     '__subclasshook__',
     '__suppress_context__',
     '__traceback__',
     'args',
     'value',
     'with_traceback']
    
    In [504]: res = result.value                                                                         
    
    In [505]: res                                                                                        
    Out[505]: Result(count=2, average=15.0)
    
    In [506]:                                                                                            
    

    16.7使用 yield from

    In [506]: def demo(): 
         ...:     yield from 'abcd' 
         ...:     yield from range(10) 
         ...:      
         ...:                                                                                            
    
    In [507]: list(demo())                                                                               
    Out[507]: ['a', 'b', 'c', 'd', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    
    In [508]:  
    

     yield from x 表达式对x对象所做的第一件事是,调用iter(x),从中获取迭代器。因此x可以是任何可迭代对象。

    yield from的主要功能是打开双向通道,把最外层的调用方法与最内层的子生成器链接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。

    委派生成器:

    包含yield from <iterable> 表达式的生成器函数

    我的理解方式是:委派生成器就像农夫山泉的广告,它其实不生产元素,它只不过是元素的搬运工,提供了一个管理。

    当然你可以在生成器函数里面执行一些代码,但也就预激的时候运行一下,后面加入想获取子生成器的return的时候会再执行一次next。

    委派生成器除了能够获取出子生成器的StopIteration里面的return 的result,在调用方与子生成器的互动中,不能获取双法发送的信息。

    子生成器:

    从yield from表达式中<iterable>部分获取的生成器,这就是PEP380的标题中所有的"子生成器"

    这个才是真正的底层工作的生成器,它里面有一个yield在辛苦的坐着底层劳动,然后通过委派生成器与最外面的调用方互动。

    调用方

    PEP380使用"调用方"这个术语指代调用委托生成的客户端代码。

    这个主要是与子生成器互动的,根据需求,可以直接获得子生成器yield出来的值,也可以send给子生成器值,发送结束命令后,获取res结果。

    from collections import namedtuple
    
    Result = namedtuple('Result', 'count average')
    
    # 子生成器
    def averager():
        total = 0.0
        count = 0
        average = None
        while True:
            term = yield
            if term is None:    # 判断输入是否为None
                break
            total += term
            count += 1
            average = total / count
        return Result(count, average)  # 返回StopIteration
    
    def second():      # 自己添加的二层委派生成器
        results = yield from averager()    # 获取最终的子生成器的res
        return results            # 获取以后直接通过StopIteration的value传出结果
    
    
    
    # 委派生成器
    def grouper(results, key):
        # while True:      # 书中这里用了while True当然这是非常好的,网上一堆代码也都这样写。
        # 有人提问为什么要while True至少百度没有啥正确答案,通过重复测试。
        # result在获得值以后grouper被重新激活,按照生成器的原理会继续向下走,但去掉了while向下走。
        # 会产生函数默认返回值return None,同时委派生成器的StopIteration报错。
        # 我这里选择加再加一个yield,这也可以避免StopIteration报错,书中的while方式当然也可以
        # 但感觉没有添加一个while快,因为再次进去 yield from averager()会再次创建一个averager()子生成器
        # 但调用方后续新建一个results[key] 与 averager()子生成器,前面刚刚新建的averager()子生成器被垃圾回收。
        # 所以再添加一个yield可以避免发生空余新建averager()子生成器,当然你也可以在调用方设置try,except来捕获异常,但不用任何操作。
        results[key] = yield from second()
        yield
    
    
    # 客户端代码,既调用方
    def main(data):
        results = {}    # 为了捕获委派生成器的结果,定义一个可变参数接收委派生成器的产出结果,是一个非常不错的主意。
        for key, values in data.items():
            # group 是调用grouper函数得到的生成器对象,传给grouper 函数的第一个参数是results,用于收集结果;第二个是某个键
            group = grouper(results, key)  # 给委派生成器传递可变参数字典,与key
            next(group)     # 这个是预激委派生成器,后面所有的委派生成器或者子生成器都会自动被预激
            for value in values:
                # 通过grouper 把value传入的值到达averager函数中;
                # grouper并不知道传入的是什么,同时grouper实例在yield from处暂停
                group.send(value)
            # 把None传入groupper,传入的值最终到达averager函数中,导致当前实例终止。然后继续创建下一个实例。
            # 如果没有group.send(None),那么averager子生成器永远不会终止,委派生成器也永远不会在此激活,也就不会为result[key]赋值
            group.send(None)
            # 子生成器停止并返回值的时候,委派生成器激活,并向下或者在循环体内执行。
        report(results)
    
    
    # 输出报告
    def report(results):
        for key, result in sorted(results.items()):
            group, unit = key.split(';')
            print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))
    
    
    data = {
        'girls;kg':[40, 41, 42, 43, 44, 54],
        'girls;m': [1.5, 1.6, 1.8, 1.5, 1.45, 1.6],
        'boys;kg':[50, 51, 62, 53, 54, 54],
        'boys;m': [1.6, 1.8, 1.8, 1.7, 1.55, 1.6],
    }
    
    if __name__ == '__main__':
        main(data)
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十六章/coroaverager3.py
     6 boys  averaging 54.00kg
     6 boys  averaging 1.68m
     6 girls averaging 44.00kg
     6 girls averaging 1.58m
    
    Process finished with exit code 0
    

     注释已经很仔细了。

    如果你不需要拿回子生成器的return(就是子生成器函数不能停下来,因为一旦停下来,函数就会有返回值,就算你没设置也会返回None),

    只想拿子生成器产出的值,由于没有return,所以这个子生成器只能是一个无线迭代的生成器。

    In [537]: def c_gen(): 
         ...:     i = 0 
         ...:     while True: 
         ...:         yield i 
         ...:         i += 1 
         ...:                                                                                            
    
    In [538]: def wei1(): 
         ...:     yield from c_gen() 
         ...:                                                                                            
    
    In [539]: def wei2(): 
         ...:     yield from wei1() 
         ...:                                                                                            
    
    In [540]: w = wei2()                                                                                 
    
    In [541]: next(w)                                                                                    
    Out[541]: 0
    
    In [542]: next(w)                                                                                    
    Out[542]: 1
    
    In [543]: next(w)                                                                                    
    Out[543]: 2
    
    In [544]: def c_gen(): 
         ...:     return (i for i in range(4)) 
         ...:      
         ...:                                                                                            
    
    In [545]: w = wei2()                                                                                 
    
    In [546]: next(w)                                                                                    
    Out[546]: 0
    
    In [547]: list(w)                                                                                    
    Out[547]: [1, 2, 3]
    
    In [548]:  
    

     这是一个不获取子生成器return值的示例,通过两层委派生成器,执行最后的委派生成器,跟直接执行子生成器效果一样。

    这个就很好的显示了管道的力量。

    In [548]: def c_gen(): 
         ...:     return iter('abc') 
         ...:      
         ...:                                                                                            
    
    In [549]: w = wei2()                                                                                 
    
    In [550]: next(w)                                                                                    
    Out[550]: 'a'
    
    In [551]: w.send('a')                                                                                
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    <ipython-input-551-77ce452bcf0e> in <module>
    ----> 1 w.send('a')
    
    <ipython-input-539-fc4381f76d91> in wei2()
          1 def wei2():
    ----> 2     yield from wei1()
          3 
    
    <ipython-input-538-91d3b9c6a7ef> in wei1()
          1 def wei1():
    ----> 2     yield from c_gen()
          3 
    
    AttributeError: 'str_iterator' object has no attribute 'send'
    

     最后我把子生成器返回一个迭代器,因为迭代器没有send方法,所以,就报错了,通过委派生成器层层传递到最外层。

    我把前面这些理解了,再取理解yield from 到底再做什么就方便多了。

    我先上书中的简化版的逻辑。

    后面又花了几个小时仔细研究了这个代码:这不就是只有一个yield的的生成器吗?只不过进行了一系列封装。

    RESULT = yield from EXPR
    
    
    # 模拟具体效果
    _i = iter(EXPR)    # 对EXPR进行iter操作,必须返回一个迭代器
    try:
        _y = next(_i)   # 为什么yield from的子生成器不用预激,就是这里帮你预激了
    except StopIteration as _e:   # 取出第一次预激就StopIteration,那准备接收返回值,也没必要yield
        _r = _e.value   # 拿出返回值
    else:        #20年2月29日补充,这个才是真正实现管道的地方,委派生成器确实可以预激活子生成器,但其后面的运行又会产生一个生成器,所以在主函数与子生成器进行交换的时候,还是要先预激活委派生成器
        while 1:       # 这里就是为什么yield from 能够取代for循环的原因了(运行这个循环时,委派生成器会阻塞,只作为调用方与子生成器之间的通道)。
            _s = yield _y     # 产出前面预激的时候获取的值给调用者,这里面yield暂停(进入了管道里面),等待调用者发送信息。
            try:
                _y = _i.send(_s)  # 将委派生成器接收到的_s发送给_s子生成器
            except StopIteration as _e:  # 如果接收到StopIteration,从子生成器中拿出返回值
                _r = _e.value
                break
    RESULT = _r   # 赋值给他 是不是发现少了什么,少了一个yield
    

     已经讲的很啰嗦了。

    • _i 迭代器(子生成器)

    • _y 产出的值 (子生成器产出的值)

    • _r 结果 (最终的结果 即整个yield from表达式的值)

    • _s 发送的值 (调用方发给委派生成器的值,这个只会传给子生成器)

    • _e 异常 (异常对象)

    上面的例子是子生成器经过iter(x)以后必定有send方法,但并不是所有的迭代器独有send方法,而且很多报错的逻辑上浮也没写。

    上一个书中的终极版本:

    RESULT = yield from EXPR
    
    
    # EXPR 可以是任何可迭代对象,因为获取迭代器_i 使用的是iter()函数。
    _i = iter(EXPR)    # 对EXPR进行iter操作,必须返回一个迭代器
    try:
        _y = next(_i) #    预激字生成器,结果保存在_y 中,作为第一个产出的值
    except StopIteration as _e:     # 3 如果调用的方法抛出StopIteration异常,获取异常对象的value属性,赋值给_r
        _r = _e.value
    else:
        while 1:             # 4 运行这个循环时,委派生成器会阻塞,只能作为调用方和子生成器直接的通道
            try:
                _s = yield _y        # 5 产出子生成器当前产出的元素;等待调用方发送_s中保存的值。
                                     # throw与close也从这里传进来
            except GeneratorExit as _e:    # 捕获关闭的信号
                try:
                    _m = _i.close     # 首先想到的是查看子生成器是否有该属性
                except AttributeError:
                    pass         # 没有就不操作子生成器
                else:
                    _m()    # 如果调用子生成器close() 方法
                raise _e         # 子生成器正常关闭,把这个关闭信号传递给委派生成器,委派生成器也关了,通道也关了。
            except BaseException as _e: # 捕获调用者的throw,里面的异常要给子生成器。
                _x = sys.exc_info()   # 保存错误信息
                try:
                    _m = _i.throw
                except AttributeError:   # 子生成器一迭代器,没有throw属性
                    raise _e        # 上浮错误还给委派生成器
                else: 
                    try:
                        _y = _m(*_x)     # 让委派生成器取throw扔出这个错误
                    except StopIteration as _e:    # 根据我前面的检查,throw进来的所有信息都会上浮到调用者包括throw(StopIteration)
                        # 但子生成器自身发出的StopIteration信息应该通过其属性返回值
                        _r = _e.value
                        break
            else:                # 9 如果产出值时没有异常
                try:             # 10 尝试让子生成器向前执行
                    if _s is None:       # 11. 如果调用者发送的值是None,那么会调用子生成器的 __next__()方法。
                        _y = next(_i)     # 赋值给_y通过循环到循环的开始yield_y输出给调用者
                    else:
                        _y = _i.send(_s)    # 11. 如果发送的值不是None,把传入的值给子生成器用,并赋值给_y通过循环到循环的开始yield_y输出给调用者。
                except StopIteration as _e: # 如果调用的方法抛出StopIteration异常,获取子生成器的return,赋值给_r, 退出循环,委派生成器恢复运行。
                    _r = _e.value
                    break
                    
                    
                    
    RESULT = _r #13 返回的结果是 _r 

     上面是完整版的逻辑,从别人处参考,但基本改了很多,我觉的他解释的是错误的。

    从这个整体逻辑来看,委派生成器内部会对每一个调用者与子生成器互动的信号进行判断,然后根据信号的不同,执行不同的操作。

    当传输关闭信号的时候:先尝试关闭子生成器,然后把自己给关了,真的一个贴心宝宝

    当有错误时BaseException,是错误祖宗,委派生成器捕获到throw进来的错误时候,会将错误保存复制后,传递给子生成器。

    上面两个yield from的代码我是抄袭书上的,也花了我几天的时候研究,测试。

    就我自己的感受yield from就是一个高度封装了很多逻辑,只有一个yield的函数。

    16.9使用案例:使用协程做离散时间仿真

    如果我能够理解,我独立开辟一篇随笔给自己做笔记。

  • 相关阅读:
    还在使用golang 的map 做Json编码么?
    Golang 性能测试(2) 性能分析
    golang 性能测试 (1) 基准性能测试
    消息队列 NSQ 源码学习笔记 (五)
    消息队列 NSQ 源码学习笔记 (四)
    消息队列 NSQ 源码学习笔记 (三)
    消息队列 NSQ 源码学习笔记 (二)
    消息队列 NSQ 源码学习笔记 (一)
    你不知道的空格
    Supervisor 使用和进阶4 (Event 的使用)
  • 原文地址:https://www.cnblogs.com/sidianok/p/12162745.html
Copyright © 2011-2022 走看看