zoukankan      html  css  js  c++  java
  • 流畅的python,Fluent Python 第十六章笔记 (协程)

    顶格标注,初学者看这个一定要记住这么几个概念:

    1、next(x)与s.send(None)的一样的效果,需要给生成器send消息,必须先执行前面的命令,让生成器预激,就是走到能够互动给值的地方。

    2、yield from x   x是可迭代对象既可,因为它首相会先对x进行iter(x)返回的副本(迭代器是自身,可迭代对象是创建一个自身的迭代器返回)进行操作。

    3、send就是比next多了一个发送给生成器暂停位置值的作用,所以send一次,必须会促使迭代器向下走一次。初学者看字面会觉的是发送消息,其实如果生成器yield有值产出,也能接收。

    send这个词汇不好,应该取一个send_next更加合理。

    4、协程只需要在send的时候进行预激,另外throw,send,close,都不需要预激。

    给自己的记号:

    生成器一般仍什么错误,上浮什么错误,但扔StopIteration

    上浮RuntimeError!!!

    In [590]: i.throw(StopIteration)                                                                     
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-588-e668612914cb> in <genexpr>(.0)
    ----> 1 i= (i for i in range(10))
    
    StopIteration:
    
    The above exception was the direct cause of the following exception:
    
    RuntimeError                              Traceback (most recent call last)
    <ipython-input-590-6828ce324924> in <module>
    ----> 1 i.throw(StopIteration)
    
    RuntimeError: generator raised StopIteration
    

     

    协程是指一个过程,这个过程与调用方协作,产出由调用方提供的值。

    16.2 用作协程的生成器的基本行为

    协程可以通过inspect.getgenerationstate(...)来返回协程状态

    'GEN_CREATED'  等待开始执行

    'GEN_RUNNING'  解释器正在执行

    'GEN_SUSPENDED'  在yield表达式暂停

    'GEN_CLOSE' 执行结束

    协程在执行send(xxx)具体内容,必须预激协程,可以通过next(gen)或gen.send(None)

    In [473]: def simple_coro2(a): 
         ...:     print('-> Started: a =', a) 
         ...:     b = yield a 
         ...:     print('-> Received: b =', b) 
         ...:     c = yield a + b 
         ...:     print('-> Received: c =', c) 
         ...:                                                                                            
    
    In [474]: my_core2 = simple_coro2(14)            # 生成generator                                                    
    
    In [475]: from inspect import getgeneratorstate                                                      
    
    In [476]: getgeneratorstate(my_core2)                                                                
    Out[476]: 'GEN_CREATED'
    
    In [477]: next(my_core2)                            # 预激协程                                                 
    -> Started: a = 14
    Out[477]: 14
    
    In [478]: getgeneratorstate(my_core2)                                                                
    Out[478]: 'GEN_SUSPENDED'
    
    In [479]: my_core2.send(28)                               # 发送数据                                           
    -> Received: b = 28
    Out[479]: 42
    
    In [480]: my_core2.send(99)                                  # 发送数据                                        
    -> Received: c = 99
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-480-f1e922706109> in <module>
    ----> 1 my_core2.send(99)
    
    StopIteration: 
    
    In [481]: getgeneratorstate(my_core2)                                                                
    Out[481]: 'GEN_CLOSED'
    

     其实这种b = yield a可以理解为一个双向通道,当执行,只不过这个双向通道是分开执行的。

    当停滞在yield a的时候,已经生产出a了,这个时候你如果执行send,可以给b对象,并且生成器向下执行到下一个yield xxx产出。

    所以,如果下一个yield后面有跟对象产出,这个send就是想送一个东西进去,然后产出一个东西出来。

    16.3 示例:使用协程计算移动平均值

    In [482]: def averager(): 
         ...:     total = 0.0 
         ...:     count = 0 
         ...:     average = None 
         ...:     while True: 
         ...:         term = yield average 
         ...:         total += term 
         ...:         count += 1 
         ...:         average = total / count 
         ...:                                                                                            
    
    In [483]: coro = average_gen()                                                                       
    
    In [484]: coro.send(None)                # 预激协程                                                               
    Out[484]: 0
    
    In [485]: coro = average_gen()                                                                       
    
    In [486]: next(coro)                      # 另外一种预激协程                                                           
    Out[486]: 0
    
    In [487]: coro.send(10)                                                                              
    Out[487]: 10.0
    
    In [488]: coro.send(20)                                                                              
    Out[488]: 15.0
    
    In [489]: coro.send(30)                                                                              
    Out[489]: 20.0
    

    16.4预激协程的装饰器

    先上装饰器

    from functools import wraps
    
    def coroutine(func):
        @wraps(func)
        def primer(*args, **kwargs):
            gen = func(*args, **kwargs)
            next(gen)     # 预激协程
            return gen
        return primer
    
    from coroutil import coroutine
    
    
    @coroutine
    def averager():
        total = 0.0
        count = 0
        average = None
        while True:
            term = yield average
            total += term
            count += 1
            average = total / count
    
    
    
    if __name__ == '__main__':
        aver = averager()
        print(aver.send(10))
        print(aver.send(20))
        print(aver.send(30))
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十六章/coroaverager0.py
    10.0
    15.0
    20.0
    
    Process finished with exit code 0
    

    16.5终止协程和异常处理

    next与close不需要预激协程,throw与send需要预激协程

    from inspect import getgeneratorstate
    
    class DemoException(Exception):
        """演示用的"""
    
    def demo_exc_handling():
        print('-> coroutine started')
        count = 0
        while True:
            try:
                x = yield f'info->({count})'    # 测试定位用
            except DemoException:
                print("*** DemoException handled. Continuing...")
            else:
                print(f'-> coroutine received:{x!r}')
            count += 1
    
    
    def throw():
        exc_coro = demo_exc_handling()
        print(exc_coro.send(None))     # 预激协程
        print(exc_coro.throw(DemoException))
        print(getgeneratorstate(exc_coro))
    
    def close():
        exc_coro = demo_exc_handling()
        print(exc_coro.send(None))
        print(exc_coro.close())     # 没有返回值,不需要预激携程
        print(getgeneratorstate(exc_coro))
    
    
    if __name__ == '__main__':
        throw()
        print('='*20)
        close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十六章/coro_exc_demo.py
    -> coroutine started
    info->(0)
    *** DemoException handled. Continuing...
    info->(1)
    GEN_SUSPENDED
    ====================
    -> coroutine started
    info->(0)
    None
    GEN_CLOSED
    
    Process finished with exit code 0
    

    16.6让协程返回值

    In [492]: from collections import namedtuple 
         ...:  
         ...: Result = namedtuple('Result', 'count average') 
         ...:  
         ...:  
         ...: def averager(): 
         ...:     total = 0.0 
         ...:     count = 0 
         ...:     average = None 
         ...:     while True: 
         ...:         term = yield 
         ...:         if term is None:    # 判断输入是否为None 
         ...:             break 
         ...:         total += term 
         ...:         count += 1 
         ...:         average = total / count 
         ...:     return Result(count, average)  # 返回StopIteration 
         ...:                                                                                            
    
    In [493]: aver = averager()                                                                          
    
    In [494]: next(aver)                                                                                 
    
    In [495]: aver.send(10)                                                                              
    
    In [496]: aver.send(20)                                                                              
    
    In [497]: aver.send(None)                                                                            
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-497-19d1f05a0674> in <module>
    ----> 1 aver.send(None)
    
    StopIteration: Result(count=2, average=15.0)
    

     前面14章我自己理解关于生成器碰到return或者没有yield生成的时候,(20年2月28日补充日记,其实就是找不到下一个yield的时候)会上报StopIteration,但这个return返回的对象尽然包含在StopIteration的属性里面。

    那就可以想办法取出来了.

    In [498]: aver = averager()                                                                          
    
    In [499]: aver.send(None)                                                                            
    
    In [500]: aver.send(10)                                                                              
    
    In [501]: aver.send(20)                                                                              
    
    In [502]: try: 
         ...:     aver.send(None) 
         ...: except StopIteration as s: 
         ...:     result = s 
         ...:                                                                                            
    
    In [503]: dir(result)                                                                                
    Out[503]: 
    ['__cause__',
     '__class__',
     '__context__',
     '__delattr__',
     '__dict__',
     '__dir__',
     '__doc__',
     '__eq__',
     '__format__',
     '__ge__',
     '__getattribute__',
     '__gt__',
     '__hash__',
     '__init__',
     '__init_subclass__',
     '__le__',
     '__lt__',
     '__ne__',
     '__new__',
     '__reduce__',
     '__reduce_ex__',
     '__repr__',
     '__setattr__',
     '__setstate__',
     '__sizeof__',
     '__str__',
     '__subclasshook__',
     '__suppress_context__',
     '__traceback__',
     'args',
     'value',
     'with_traceback']
    
    In [504]: res = result.value                                                                         
    
    In [505]: res                                                                                        
    Out[505]: Result(count=2, average=15.0)
    
    In [506]:                                                                                            
    

    16.7使用 yield from

    In [506]: def demo(): 
         ...:     yield from 'abcd' 
         ...:     yield from range(10) 
         ...:      
         ...:                                                                                            
    
    In [507]: list(demo())                                                                               
    Out[507]: ['a', 'b', 'c', 'd', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    
    In [508]:  
    

     yield from x 表达式对x对象所做的第一件事是,调用iter(x),从中获取迭代器。因此x可以是任何可迭代对象。

    yield from的主要功能是打开双向通道,把最外层的调用方法与最内层的子生成器链接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。

    委派生成器:

    包含yield from <iterable> 表达式的生成器函数

    我的理解方式是:委派生成器就像农夫山泉的广告,它其实不生产元素,它只不过是元素的搬运工,提供了一个管理。

    当然你可以在生成器函数里面执行一些代码,但也就预激的时候运行一下,后面加入想获取子生成器的return的时候会再执行一次next。

    委派生成器除了能够获取出子生成器的StopIteration里面的return 的result,在调用方与子生成器的互动中,不能获取双法发送的信息。

    子生成器:

    从yield from表达式中<iterable>部分获取的生成器,这就是PEP380的标题中所有的"子生成器"

    这个才是真正的底层工作的生成器,它里面有一个yield在辛苦的坐着底层劳动,然后通过委派生成器与最外面的调用方互动。

    调用方

    PEP380使用"调用方"这个术语指代调用委托生成的客户端代码。

    这个主要是与子生成器互动的,根据需求,可以直接获得子生成器yield出来的值,也可以send给子生成器值,发送结束命令后,获取res结果。

    from collections import namedtuple
    
    Result = namedtuple('Result', 'count average')
    
    # 子生成器
    def averager():
        total = 0.0
        count = 0
        average = None
        while True:
            term = yield
            if term is None:    # 判断输入是否为None
                break
            total += term
            count += 1
            average = total / count
        return Result(count, average)  # 返回StopIteration
    
    def second():      # 自己添加的二层委派生成器
        results = yield from averager()    # 获取最终的子生成器的res
        return results            # 获取以后直接通过StopIteration的value传出结果
    
    
    
    # 委派生成器
    def grouper(results, key):
        # while True:      # 书中这里用了while True当然这是非常好的,网上一堆代码也都这样写。
        # 有人提问为什么要while True至少百度没有啥正确答案,通过重复测试。
        # result在获得值以后grouper被重新激活,按照生成器的原理会继续向下走,但去掉了while向下走。
        # 会产生函数默认返回值return None,同时委派生成器的StopIteration报错。
        # 我这里选择加再加一个yield,这也可以避免StopIteration报错,书中的while方式当然也可以
        # 但感觉没有添加一个while快,因为再次进去 yield from averager()会再次创建一个averager()子生成器
        # 但调用方后续新建一个results[key] 与 averager()子生成器,前面刚刚新建的averager()子生成器被垃圾回收。
        # 所以再添加一个yield可以避免发生空余新建averager()子生成器,当然你也可以在调用方设置try,except来捕获异常,但不用任何操作。
        results[key] = yield from second()
        yield
    
    
    # 客户端代码,既调用方
    def main(data):
        results = {}    # 为了捕获委派生成器的结果,定义一个可变参数接收委派生成器的产出结果,是一个非常不错的主意。
        for key, values in data.items():
            # group 是调用grouper函数得到的生成器对象,传给grouper 函数的第一个参数是results,用于收集结果;第二个是某个键
            group = grouper(results, key)  # 给委派生成器传递可变参数字典,与key
            next(group)     # 这个是预激委派生成器,后面所有的委派生成器或者子生成器都会自动被预激
            for value in values:
                # 通过grouper 把value传入的值到达averager函数中;
                # grouper并不知道传入的是什么,同时grouper实例在yield from处暂停
                group.send(value)
            # 把None传入groupper,传入的值最终到达averager函数中,导致当前实例终止。然后继续创建下一个实例。
            # 如果没有group.send(None),那么averager子生成器永远不会终止,委派生成器也永远不会在此激活,也就不会为result[key]赋值
            group.send(None)
            # 子生成器停止并返回值的时候,委派生成器激活,并向下或者在循环体内执行。
        report(results)
    
    
    # 输出报告
    def report(results):
        for key, result in sorted(results.items()):
            group, unit = key.split(';')
            print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))
    
    
    data = {
        'girls;kg':[40, 41, 42, 43, 44, 54],
        'girls;m': [1.5, 1.6, 1.8, 1.5, 1.45, 1.6],
        'boys;kg':[50, 51, 62, 53, 54, 54],
        'boys;m': [1.6, 1.8, 1.8, 1.7, 1.55, 1.6],
    }
    
    if __name__ == '__main__':
        main(data)
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十六章/coroaverager3.py
     6 boys  averaging 54.00kg
     6 boys  averaging 1.68m
     6 girls averaging 44.00kg
     6 girls averaging 1.58m
    
    Process finished with exit code 0
    

     注释已经很仔细了。

    如果你不需要拿回子生成器的return(就是子生成器函数不能停下来,因为一旦停下来,函数就会有返回值,就算你没设置也会返回None),

    只想拿子生成器产出的值,由于没有return,所以这个子生成器只能是一个无线迭代的生成器。

    In [537]: def c_gen(): 
         ...:     i = 0 
         ...:     while True: 
         ...:         yield i 
         ...:         i += 1 
         ...:                                                                                            
    
    In [538]: def wei1(): 
         ...:     yield from c_gen() 
         ...:                                                                                            
    
    In [539]: def wei2(): 
         ...:     yield from wei1() 
         ...:                                                                                            
    
    In [540]: w = wei2()                                                                                 
    
    In [541]: next(w)                                                                                    
    Out[541]: 0
    
    In [542]: next(w)                                                                                    
    Out[542]: 1
    
    In [543]: next(w)                                                                                    
    Out[543]: 2
    
    In [544]: def c_gen(): 
         ...:     return (i for i in range(4)) 
         ...:      
         ...:                                                                                            
    
    In [545]: w = wei2()                                                                                 
    
    In [546]: next(w)                                                                                    
    Out[546]: 0
    
    In [547]: list(w)                                                                                    
    Out[547]: [1, 2, 3]
    
    In [548]:  
    

     这是一个不获取子生成器return值的示例,通过两层委派生成器,执行最后的委派生成器,跟直接执行子生成器效果一样。

    这个就很好的显示了管道的力量。

    In [548]: def c_gen(): 
         ...:     return iter('abc') 
         ...:      
         ...:                                                                                            
    
    In [549]: w = wei2()                                                                                 
    
    In [550]: next(w)                                                                                    
    Out[550]: 'a'
    
    In [551]: w.send('a')                                                                                
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    <ipython-input-551-77ce452bcf0e> in <module>
    ----> 1 w.send('a')
    
    <ipython-input-539-fc4381f76d91> in wei2()
          1 def wei2():
    ----> 2     yield from wei1()
          3 
    
    <ipython-input-538-91d3b9c6a7ef> in wei1()
          1 def wei1():
    ----> 2     yield from c_gen()
          3 
    
    AttributeError: 'str_iterator' object has no attribute 'send'
    

     最后我把子生成器返回一个迭代器,因为迭代器没有send方法,所以,就报错了,通过委派生成器层层传递到最外层。

    我把前面这些理解了,再取理解yield from 到底再做什么就方便多了。

    我先上书中的简化版的逻辑。

    后面又花了几个小时仔细研究了这个代码:这不就是只有一个yield的的生成器吗?只不过进行了一系列封装。

    RESULT = yield from EXPR
    
    
    # 模拟具体效果
    _i = iter(EXPR)    # 对EXPR进行iter操作,必须返回一个迭代器
    try:
        _y = next(_i)   # 为什么yield from的子生成器不用预激,就是这里帮你预激了
    except StopIteration as _e:   # 取出第一次预激就StopIteration,那准备接收返回值,也没必要yield
        _r = _e.value   # 拿出返回值
    else:        #20年2月29日补充,这个才是真正实现管道的地方,委派生成器确实可以预激活子生成器,但其后面的运行又会产生一个生成器,所以在主函数与子生成器进行交换的时候,还是要先预激活委派生成器
        while 1:       # 这里就是为什么yield from 能够取代for循环的原因了(运行这个循环时,委派生成器会阻塞,只作为调用方与子生成器之间的通道)。
            _s = yield _y     # 产出前面预激的时候获取的值给调用者,这里面yield暂停(进入了管道里面),等待调用者发送信息。
            try:
                _y = _i.send(_s)  # 将委派生成器接收到的_s发送给_s子生成器
            except StopIteration as _e:  # 如果接收到StopIteration,从子生成器中拿出返回值
                _r = _e.value
                break
    RESULT = _r   # 赋值给他 是不是发现少了什么,少了一个yield
    

     已经讲的很啰嗦了。

    • _i 迭代器(子生成器)

    • _y 产出的值 (子生成器产出的值)

    • _r 结果 (最终的结果 即整个yield from表达式的值)

    • _s 发送的值 (调用方发给委派生成器的值,这个只会传给子生成器)

    • _e 异常 (异常对象)

    上面的例子是子生成器经过iter(x)以后必定有send方法,但并不是所有的迭代器独有send方法,而且很多报错的逻辑上浮也没写。

    上一个书中的终极版本:

    RESULT = yield from EXPR
    
    
    # EXPR 可以是任何可迭代对象,因为获取迭代器_i 使用的是iter()函数。
    _i = iter(EXPR)    # 对EXPR进行iter操作,必须返回一个迭代器
    try:
        _y = next(_i) #    预激字生成器,结果保存在_y 中,作为第一个产出的值
    except StopIteration as _e:     # 3 如果调用的方法抛出StopIteration异常,获取异常对象的value属性,赋值给_r
        _r = _e.value
    else:
        while 1:             # 4 运行这个循环时,委派生成器会阻塞,只能作为调用方和子生成器直接的通道
            try:
                _s = yield _y        # 5 产出子生成器当前产出的元素;等待调用方发送_s中保存的值。
                                     # throw与close也从这里传进来
            except GeneratorExit as _e:    # 捕获关闭的信号
                try:
                    _m = _i.close     # 首先想到的是查看子生成器是否有该属性
                except AttributeError:
                    pass         # 没有就不操作子生成器
                else:
                    _m()    # 如果调用子生成器close() 方法
                raise _e         # 子生成器正常关闭,把这个关闭信号传递给委派生成器,委派生成器也关了,通道也关了。
            except BaseException as _e: # 捕获调用者的throw,里面的异常要给子生成器。
                _x = sys.exc_info()   # 保存错误信息
                try:
                    _m = _i.throw
                except AttributeError:   # 子生成器一迭代器,没有throw属性
                    raise _e        # 上浮错误还给委派生成器
                else: 
                    try:
                        _y = _m(*_x)     # 让委派生成器取throw扔出这个错误
                    except StopIteration as _e:    # 根据我前面的检查,throw进来的所有信息都会上浮到调用者包括throw(StopIteration)
                        # 但子生成器自身发出的StopIteration信息应该通过其属性返回值
                        _r = _e.value
                        break
            else:                # 9 如果产出值时没有异常
                try:             # 10 尝试让子生成器向前执行
                    if _s is None:       # 11. 如果调用者发送的值是None,那么会调用子生成器的 __next__()方法。
                        _y = next(_i)     # 赋值给_y通过循环到循环的开始yield_y输出给调用者
                    else:
                        _y = _i.send(_s)    # 11. 如果发送的值不是None,把传入的值给子生成器用,并赋值给_y通过循环到循环的开始yield_y输出给调用者。
                except StopIteration as _e: # 如果调用的方法抛出StopIteration异常,获取子生成器的return,赋值给_r, 退出循环,委派生成器恢复运行。
                    _r = _e.value
                    break
                    
                    
                    
    RESULT = _r #13 返回的结果是 _r 

     上面是完整版的逻辑,从别人处参考,但基本改了很多,我觉的他解释的是错误的。

    从这个整体逻辑来看,委派生成器内部会对每一个调用者与子生成器互动的信号进行判断,然后根据信号的不同,执行不同的操作。

    当传输关闭信号的时候:先尝试关闭子生成器,然后把自己给关了,真的一个贴心宝宝

    当有错误时BaseException,是错误祖宗,委派生成器捕获到throw进来的错误时候,会将错误保存复制后,传递给子生成器。

    上面两个yield from的代码我是抄袭书上的,也花了我几天的时候研究,测试。

    就我自己的感受yield from就是一个高度封装了很多逻辑,只有一个yield的函数。

    16.9使用案例:使用协程做离散时间仿真

    如果我能够理解,我独立开辟一篇随笔给自己做笔记。

  • 相关阅读:
    avcodec_open2()分析
    CentOS 6.9 下安装DB2
    使用python操作mysql数据库
    python之tcp自动重连
    决策树算法
    文件夹自动同步工具
    KNN算法介绍
    go语言生成uuid
    golang之log rotate
    golang之tcp自动重连
  • 原文地址:https://www.cnblogs.com/sidianok/p/12162745.html
Copyright © 2011-2022 走看看