zoukankan      html  css  js  c++  java
  • python asyncio协程

    asyncio

    协程

    协程,可以看作单线程下多个任务同时进行。譬如在 A 函数里面调用 B 函数,正常来说,会等 B 函数执行完毕,获取返回值后才能回到 A 继续执行,但是在协程下,A 函数 调用 B 函数后,B 函数可以不执行完毕,再切换回 A 函数,也就是可以在程序内部中断去做别的事情,如此反复操作,看上去像是多线程,但实际上是单线程的。

    协程的实现方法其实就是迭代器的yield 操作,我们知道在迭代器中,遇到 yield 会中断返回,下次操作时会从这次中断的地方继续执行,在python3.3以后,又加入了yield from 关键字,它后面可以跟迭代器,这样你就可以从一个迭代器中断,进入另一个迭代器去运行,在 asyncio中,await 就相当于 yield from

    具体可以看附录第三个问题。

    基础的协程

    1. async def 开始声明一个函数
    2. 创建事件循环,并添加协程对象(Corountine)来执行
    async def test():  # 声明一个协程函数 test
    asyncio.ensure_future(obj) # 将协程对象转变成 future
    asyncio.gather(coroutines/futures) # 将协程对象或者future,打包成一个 future
    asyncio.wait([task1,task2]) # 等待futures或coroutines完成,返回一个 coroutine
    
    import asyncio
    
    async def test(): # async开头,来定义协程函数
        print('1')
        await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
        print('3')
    
    t = test() # 不会执行test(),t只是一个协程对象
    
    asyncio.run(t) # 函数运行入口,py3.7以后的功能,可以算作等同于下面三句话
    
    # loop = asyncio.get_event_loop()  # 创建事件循环
    # loop.run_until_complete(t)  # 将协程函数对象,放入到事件循环中执行
    # loop.close()  # 关闭循环,不关闭也没问题,只是关闭了以后,就不能再次执行 loop.run_until_complete(t) 了
    
    

    可等待对象

    协程

    import asyncio
    
    async def nested():
        return 42
    
    async def main():
        print(await nested())  # will print "42".
    
    asyncio.run(main())
    

    任务

    当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行,Task 是 future 的一个子类。asyncio.TaskFuture 继承了其除 Future.set_result()Future.set_exception() 以外的所有 API:

    import asyncio
    
    
    async def test(): # async开头,来定义协程函数
        print('1')
        await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
        print('3')
        return 'Finished.'
    
    async def test2():
    	# 注意:任务一旦被创建,就会添加到事件循环里面
        task1 = asyncio.create_task(test())
        task2 = asyncio.create_task(test())
    	# 至此,事件循环里有:test2(),test(),test()
        
        ret1 = await task1
        ret2 = await task2
        print(ret1,ret2)
    
    asyncio.run(test2())  # 1.创建事件循环 2.将 test2() 添加到事件循环 3. 执行事件循环里面的所有事件
    
    

    future 对象

    Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果

    当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

    在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。

    通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

    Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:

    方法名 解释
    set_result(result) 将 Future 标记为 完成 并设置结果。
    set_exception(exception) 将 Future 标记为 完成 并设置一个异常。
    done() 如果 Future 为已 完成 则返回 True
    cancelled() 如果 Future 已 取消 则返回 True
    add_done_callback(callback, *, context=None) 添加一个在 Future 完成 时运行的回调函数。
    remove_done_callback(callback) 从回调列表中移除 callback
    result() 返回 Future 的结果。
    cancel(msg=None) 取消 Future 并调度回调函数。
    exception() 返回 Future 已设置的异常。
    get_loop() 返回 Future 对象已绑定的事件循环。
    import asyncio
    
    async def set_after(fut, delay, value):
        await asyncio.sleep(delay)
    
        # 给 future 设置结果
        fut.set_result(value)
    
    def cb(fut):  # future 的 callback 函数的唯一参数是 future
        print("!!!")
    
    async def main():
        loop = asyncio.get_running_loop()
    
        # 创建 Future 对象
        fut = loop.create_future()
        fut.add_done_callback(cb)  # 添加回调函数
        loop.create_task(set_after(fut, 3, 'Hello world'))
    
        # 等待 future 有结果后,打印这个结果
        print(await fut)
    
    asyncio.run(main())
    
    import asyncio
    
    
    async def test(number): # async开头,来定义协程函数
        print(number)
        await asyncio.sleep(2) # 模拟IO操作,await可等待的有:协程对象,Future,Task
    
    
    async def main():
        await asyncio.gather(test(1),test(2),test(3))  # gather 返回的是future,可等待对象
    
    asyncio.run(main())
    

    超时

    设置一个等待协程的超时时间

    import asyncio
    
    async def eternity():
        # sleep 2s
        await asyncio.sleep(2)
        print('yay!')
    
    async def main():
        # 设置超时时间 10s
        try:
            await asyncio.wait_for(eternity(), timeout=10)
        except asyncio.TimeoutError:
            print('timeout!')
    
    asyncio.run(main())
    

    等待

    asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
    

    等待可迭代对象中的每个 可等待对象。aws 是不为空的可迭代对象

    return_when 指定此函数应在何时返回。它必须为以下常数之一:

    常数 描述
    FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
    FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
    ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。
    import asyncio
    
    async def foo(num):
        await asyncio.sleep(num)
        print("something")
        return 42
    
    async def main():
        task = asyncio.create_task(foo(1))
        task2 = asyncio.create_task(foo(5))
        done, pending = await asyncio.wait([task,task2],return_when=asyncio.FIRST_COMPLETED)
    
        if task in done:
            print("Done.")
    
    asyncio.run(main())
    

    在线程中运行

    asyncio.to_thread(func, /, *args, **kwargs)
    

    例子:

    import asyncio
    import time
    
    def blocking_io(num=2):
        print(f"start blocking_io at {time.strftime('%X')}")
        # 模拟 IO 操作
        time.sleep(num)
        print(f"blocking_io complete at {time.strftime('%X')}")
    
    async def main():
        print(f"started main at {time.strftime('%X')}")
        thread = asyncio.to_thread(blocking_io,num=4)  # 线程
        await asyncio.gather(thread)
        print(f"finished main at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    例子:

    import asyncio
    import requests
    import functools
    
    async def crawler(url):
        print('Start crawling:', url)
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'
        }
        # 利用BaseEventLoop.run_in_executor()可以在coroutine中执行第三方的命令,例如requests.get()
        # 第三方命令的参数与关键字利用functools.partial传入
        future = asyncio.get_event_loop().run_in_executor(None, functools.partial(requests.get, url, headers=headers))
        response = await future
        print('Response received:', url)
        # 处理获取到的URL响应
        with open(url[-6:], 'wb') as f:
            f.write(response.content)
    
    url_list = ['https://w.wallhaven.cc/full/57/wallhaven-57lzd8.jpg',
                'https://w.wallhaven.cc/full/zm/wallhaven-zm5lyg.jpg',
                'https://w.wallhaven.cc/full/28/wallhaven-28ey2g.jpg',
                'https://w.wallhaven.cc/full/p8/wallhaven-p8gvvp.jpg']
    tasks = [crawler(url) for url in url_list]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    

    内省:当前task

    asyncio.current_task(loop=None), asyncio.all_tasks(loop=None) 来获取当前task和所有task

    import asyncio
    import time
    
    async def blocking_io():
        time.sleep(3)
        return 10
    
    
    async def main():
    
        task = asyncio.create_task(blocking_io())
        ret = await task
        cur = asyncio.current_task()  # cur 是 main()
        all = asyncio.all_tasks()  # all 里面也只有 main()
        print(ret)
        print(cur)
        print(all)
    
    asyncio.run(main())
    

    Task 对象

    Task.cancel()

    要取消一个正在运行的 Task 对象可使用 cancel() 方法。调用此方法将使该 Task 对象抛出一个 CancelledError 异常给打包的协程。如果取消期间一个协程正在等待一个 Future 对象,该 Future 对象也将被取消。

    示例1:

    import asyncio
    
    async def cancel_me():
        print('cancel_me(): before sleep')
        try:
            await asyncio.sleep(3600)
        except asyncio.CancelledError:
            print('cancel_me(): cancel sleep')
            raise
        finally:
            print('cancel_me(): after sleep')
    
    async def main():
        task = asyncio.create_task(cancel_me())  # 2. 添加 cancel_me() 到 event loop
    
        task.cancel()  # 3. 取消 task
        try:
            await task  # 4. 因为 task 已经取消,所以报 CancelledError
        except asyncio.CancelledError:
            print("main(): cancel_me is cancelled now")  # 5. 打印信息
    
    asyncio.run(main())  # 1. run main()
    
    
    ########### result
    # main(): cancel_me is cancelled now
    

    示例2:

    和示例1相比,仅在 task.cancel() 前多加了一句话 await asyncio.sleep(1)

    import asyncio
    
    async def cancel_me():
        print('cancel_me(): before sleep')  # 4. 打印
        try:
            await asyncio.sleep(3600)  # 5. 遇到sleep,跳出当前任务,去事件循环执行下一个任务: main()  # 8. 重新回到这里,因为任务 cancel,所以等待的 sleep 也会取消,cancelledError
        except asyncio.CancelledError:
            print('cancel_me(): cancel sleep')  # 9. 打印并报出异常
            raise
        finally:
            print('cancel_me(): after sleep')  # 10. 打印
    
    async def main():
        task = asyncio.create_task(cancel_me())  # 2. add cancel_me() to event loop
    
        await asyncio.sleep(1)  # 3. 模拟IO操作,会跳出当前任务,去事件循环里执行下一个任务: cancel_me()
    
        task.cancel()  # 6. 取消任务
        try:
            await task  # 7. 等待 task,执行 cancel_me()
        except asyncio.CancelledError:
            print("main(): cancel_me is cancelled now")  # 11. 捕获异常,并打印信息
    
    asyncio.run(main())  # 1. run main()
    

    cancelled(), done(), result(), exception()

    返回task是否取消,是否完成,task的结果,task的异常

    import asyncio
    
    async def aa():
        return 10
    
    async def main():
        task = asyncio.create_task(aa())  # 2. add cancel_me() to event loop
        await task
        print(task.result(),task.done(),task.cancelled(),task.exception())
    
    asyncio.run(main())  # 1. run main()
    
    
    # ---------- result
    #: 10 True False None
    

    add_done_callback()

    添加回调,在 task 结束时运行,还有个移除回调函数的:remove_done_callback()

    import asyncio
    
    async def ten():
        return 10
    
    def cb(task):  # callback 函数要接受一个 Task 对象
        print("done.")
    
    async def main():
        task = asyncio.create_task(ten())
        task.add_done_callback(cb)  # 添加回调函数
        await task
    
    asyncio.run(main())  # 1. run main()
    

    get_name(), set_name(), get_coro()

    获取 task 名字,设置 task 名字,获取 协程 对象

    import asyncio
    
    async def ten():
        return 10
    
    def cb(task):  # call back 要接受一个 Task 对象
        print("done.")
    
    async def main():
        task = asyncio.create_task(ten())
        task.set_name("TEN")
        print(task.get_coro(),task.get_name())
        await task
    
    asyncio.run(main())  # 1. run main()
    
    # ======= result =====
    # <coroutine object ten at 0x03F6DCA8> TEN
    

    子进程

    1. 执行 shell 类似命令

    asyncio.create_subprocess_shell(cmd,stdin=None,stdout=None,stderr=None)

    类似于Popen(args,shell=True)

    import asyncio
    
    
    async def run(cmd):
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
        stdout, stderr = await proc.communicate()  # 此处没有给子进程传递 input 数据,只是为了获取执行结果
    
    asyncio.run(run('ipconfig /all'))
    
    1. 执行其他程序

    create_subprocess_exec(program,*args,stdin=None,stdout=None,stderr=None)

    可以在命令行执行某个应用程序,program 是可执行程序的路径,*args 是它的命令行参数

    import asyncio
    import sys
    
    async def get_date():
        code = 'import datetime; print(datetime.datetime.now())'
    	# sys.executable 是本机的 python.exe 路径
        proc = await asyncio.create_subprocess_exec(
            sys.executable, '-c', code,
            stdout=asyncio.subprocess.PIPE)
    
        # Read one line of output.
        data = await proc.stdout.readline()
        line = data.decode('ascii').rstrip()
    
        # Wait for the subprocess exit.
        await proc.wait()
        return line
    
    date = asyncio.run(get_date())
    print(f"Current date: {date}")
    
    1. 其他属性或方法:
    wait()  # coroutine 
    communicate(input=None)  # coroutine,所以是可等待对象
    kill()
    terminate()
    send_signal(signal)
    stdin
    stdout
    stderr
    returncode
    pid
    

    队列

    asyncio.Queue(maxsize=0,*,loop=None) 和普通的队列类似。

    同步

    asyncio.Lock() , asyncio.Event() , asyncio.Condition , asyncio.Semaphore

    略,和普通的锁,事件,条件,信号量类似。

    事件循环

    函数 含义
    asyncio.get_running_loop() 获取当前OS中运行的事件循环
    asyncio.get_event_loop() 获取当前的事件循环
    asyncio.set_event_loop(loop) 设置事件循环
    asyncio.new_event_loop() 创建新的事件循环
    loop.run_until_complete(future) 运行直到future全部完成
    loop.run_forever() 直到 loop.stop() 被调用才停止,否则一直执行
    loop.stop()
    loop.close() 当这个函数被调用的时候,循环必须处于非运行状态

    事件回调: call_soon(callback)

    call_soon(callback,*args),意思是在事件循环中,下次迭代时调用: callback(*args)。可以简单理解为:在下次迭代事件循环时,尽可能最先调用并且只调用一次这个函数。返回一个能用来取消回调的 asyncio.Handle 实例

    import asyncio
    
    async def test1():
        await asyncio.sleep(6)
        print(asyncio.current_task().get_name())
        return 10
    
    def call(msg):
        import time
        time.sleep(3)
        print(msg)
    
    
    loop = asyncio.get_event_loop()
    t = test1()
    t2 = test1()
    loop.call_soon(call,"called.")
    loop.run_until_complete(asyncio.gather(t,t2))
    

    call_later(delay,callback,*args)

    在给定的 delay 秒后调用。其他同 call_soon。

    call_at(when,callback,*args)

    指定的时间(时间戳)时调用一次。其他同 call_soon.

    附录:

    1. Future,Task and Event loop

    Event Loop

    On any platform, when we want to do something asynchronously, it usually involves an event loop. An event loop is a loop that can register tasks to be executed, execute them, delay or even cancel them and handle different events related to these operations. Generally, we schedule multiple async functions to the event loop. The loop runs one function, while that function waits for IO, it pauses it and runs another. When the first function completes IO, it is resumed. Thus two or more functions can co-operatively run together. This the main goal of an event loop.

    在一个平台,我们想要做一些异步的事情,总会牵扯到事件循环。一个事件循环可以注册一些任务去执行,延迟,取消,以及其他和这些操作有关的事情。通常来说,我们安排不同的异步函数到事件循环中去。事件循环会运行函数,当这个函数遇到IO操作时,事件循环会暂停这个函数转而运行另一个函数。当第一个函数完成了IO操作,这个函数会继续执行,这样,两个或者多个函数就可以一起执行,这就是事件循环的主要目的。

    The event loop can also pass resource intensive functions to a thread pool for processing. The internals of the event loop is quite complex and we don’t need to worry much about it right away. We just need to remember that the event loop is the mechanism through which we can schedule our async functions and get them executed.

    事件循环也可以将资源密集型任务放到线程池中处理。事件循环的内部结构十分复杂,我们不必过多关注这个问题。我们只需要记住事件循环就是一个可以安排多个异步任务,并让它们执行起来的一个机制就好了。

    Futures / Tasks

    If you are into Javascript too, you probably know about Promise. In Python we have similar concepts – Future/Task. A Future is an object that is supposed to have a result in the future. A Task is a subclass of Future that wraps a coroutine. When the coroutine finishes, the result of the Task is realized.

    一个Future就是一个在未来能产出一个result的对象。一个Task是Future的子类,它封装了coroutine(协程),当coroutine完成后,这个Task的结果也就实现了。

    Coroutines

    We discussed Coroutines in our last blog post. It’s a way of pausing a function and returning a series of values periodically. A coroutine can pause the execution of the function by using the yield yield from or await (python 3.5+) keywords in an expression. The function is paused until the yield statement actually gets a value.

    协程,就是暂停一个函数并且定期返回一系列值的一种方法。一个协程可以通过使用关键字yield,yield from,或者await 来暂停函数的执行。在yield获得值之前程序会一直暂停。

    Fitting Event Loop and Future/Task Together

    It’s simple. We need an event loop and we need to register our future/task objects with the event loop. The loop will schedule and run them. We can add callbacks to our future/task objects so that we can be notified when a future has it’s results.

    Very often we choose to use coroutines for our work. We wrap a coroutine in Future and get a Task object. When a coroutine yields, it is paused. When it has a value, it is resumed. When it returns, the Task has completed and gets a value. Any associated callback is run. If the coroutine raises an exception, the Task fails and not resolved.

    为了将事件循环和Future/Task整合一起,我们需要用事件循环注册我们的Future/Task,这个循环会计划和运行这些任务。当这些任务有了结果时,我们可以加入回调来获取通知。

    2. Combining Coroutines with Threads and Processes

    A lot of existing libraries are not ready to be used with asyncio natively. They may block, or depend on concurrency features not available through the module. It is still possible to use those libraries in an application based on asyncio by using an executor from concurrent.futures to run the code either in a separate thread or a separate process.

    Threads

    The run_in_executor() method of the event loop takes an executor instance, a regular callable to invoke, and any arguments to be passed to the callable. It returns a Future that can be used to wait for the function to finish its work and return something. If no executor is passed in, a ThreadPoolExecutor is created. This example explicitly creates an executor to limit the number of worker threads it will have available.

    A ThreadPoolExecutor starts its worker threads and then calls each of the provided functions once in a thread. This example shows how to combine run_in_executor() and wait() to have a coroutine yield control to the event loop while blocking functions run in separate threads, and then wake back up when those functions are finished.

    asyncio_executor_thread.py

    import asyncio
    import concurrent.futures
    import logging
    import sys
    import time
    
    
    def blocks(n):
        log = logging.getLogger('blocks({})'.format(n))
        log.info('running')
        time.sleep(0.1)
        log.info('done')
        return n ** 2
    
    
    async def run_blocking_tasks(executor):
        log = logging.getLogger('run_blocking_tasks')
        log.info('starting')
    
        log.info('creating executor tasks')
        loop = asyncio.get_event_loop()
        blocking_tasks = [
            loop.run_in_executor(executor, blocks, i)
            for i in range(6)
        ]
        log.info('waiting for executor tasks')
        completed, pending = await asyncio.wait(blocking_tasks)
        results = [t.result() for t in completed]
        log.info('results: {!r}'.format(results))
    
        log.info('exiting')
    
    
    if __name__ == '__main__':
        # Configure logging to show the name of the thread
        # where the log message originates.
        logging.basicConfig(
            level=logging.INFO,
            format='%(threadName)10s %(name)18s: %(message)s',
            stream=sys.stderr,
        )
    
        # Create a limited thread pool.
        executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=3,
        )
    
        event_loop = asyncio.get_event_loop()
        try:
            event_loop.run_until_complete(
                run_blocking_tasks(executor)
            )
        finally:
            event_loop.close()
    

    asyncio_executor_thread.py uses logging to conveniently indicate which thread and function are producing each log message. Because a separate logger is used in each call to blocks(), the output clearly shows the same threads being reused to call multiple copies of the function with different arguments.

    $ python3 asyncio_executor_thread.py
    
    MainThread run_blocking_tasks: starting
    MainThread run_blocking_tasks: creating executor tasks
    ThreadPoolExecutor-0_0          blocks(0): running
    ThreadPoolExecutor-0_1          blocks(1): running
    ThreadPoolExecutor-0_2          blocks(2): running
    MainThread run_blocking_tasks: waiting for executor tasks
    ThreadPoolExecutor-0_0          blocks(0): done
    ThreadPoolExecutor-0_1          blocks(1): done
    ThreadPoolExecutor-0_2          blocks(2): done
    ThreadPoolExecutor-0_0          blocks(3): running
    ThreadPoolExecutor-0_1          blocks(4): running
    ThreadPoolExecutor-0_2          blocks(5): running
    ThreadPoolExecutor-0_0          blocks(3): done
    ThreadPoolExecutor-0_2          blocks(5): done
    ThreadPoolExecutor-0_1          blocks(4): done
    MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
    MainThread run_blocking_tasks: exiting
    

    Processes

    A ProcessPoolExecutor works in much the same way, creating a set of worker processes instead of threads. Using separate processes requires more system resources, but for computationally-intensive operations it can make sense to run a separate task on each CPU core.

    asyncio_executor_process.py

    # changes from asyncio_executor_thread.py
    
    if __name__ == '__main__':
        # Configure logging to show the id of the process
        # where the log message originates.
        logging.basicConfig(
            level=logging.INFO,
            format='PID %(process)5s %(name)18s: %(message)s',
            stream=sys.stderr,
        )
    
        # Create a limited process pool.
        executor = concurrent.futures.ProcessPoolExecutor(
            max_workers=3,
        )
    
        event_loop = asyncio.get_event_loop()
        try:
            event_loop.run_until_complete(
                run_blocking_tasks(executor)
            )
        finally:
            event_loop.close()
    

    The only change needed to move from threads to processes is to create a different type of executor. This example also changes the logging format string to include the process id instead of the thread name, to demonstrate that the tasks are in fact running in separate processes.

    $ python3 asyncio_executor_process.py
    
    PID 40498 run_blocking_tasks: starting
    PID 40498 run_blocking_tasks: creating executor tasks
    PID 40498 run_blocking_tasks: waiting for executor tasks
    PID 40499          blocks(0): running
    PID 40500          blocks(1): running
    PID 40501          blocks(2): running
    PID 40499          blocks(0): done
    PID 40500          blocks(1): done
    PID 40501          blocks(2): done
    PID 40500          blocks(3): running
    PID 40499          blocks(4): running
    PID 40501          blocks(5): running
    PID 40499          blocks(4): done
    PID 40500          blocks(3): done
    PID 40501          blocks(5): done
    PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
    PID 40498 run_blocking_tasks: exiting
    

    3. How does asyncio work?, Bharel的回答

    Before answering this question we need to understand a few base terms, skip these if you already know any of them.

    Generators

    Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword yield. By creating a normal function containing the yield keyword, we turn that function into a generator:

    >>> def test():
    ...     yield 1
    ...     yield 2
    ...
    >>> gen = test()
    >>> next(gen)
    1
    >>> next(gen)
    2
    >>> next(gen)
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    StopIteration
    

    As you can see, calling next() on the generator causes the interpreter to load test's frame, and return the yielded value. Calling next() again, cause the frame to load again into the interpreter stack, and continue on yielding another value.

    By the third time next() is called, our generator was finished, and StopIteration was thrown.

    Communicating with a generator

    A less-known feature of generators, is the fact that you can communicate with them using two methods: send() and throw().

    >>> def test():
    ...     val = yield 1
    ...     print(val)
    ...     yield 2
    ...     yield 3
    ...
    >>> gen = test()
    >>> next(gen)
    1
    >>> gen.send("abc")
    abc
    2
    >>> gen.throw(Exception())
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "<stdin>", line 4, in test
    Exception
    

    Upon calling gen.send(), the value is passed as a return value from the yield keyword.

    gen.throw() on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield was called.

    Returning values from generators

    Returning a value from a generator, results in the value being put inside the StopIteration exception. We can later on recover the value from the exception and use it to our need.

    >>> def test():
    ...     yield 1
    ...     return "abc"
    ...
    >>> gen = test()
    >>> next(gen)
    1
    >>> try:
    ...     next(gen)
    ... except StopIteration as exc:
    ...     print(exc.value)
    ...
    abc
    

    Behold, a new keyword: yield from

    Python 3.4 came with the addition of a new keyword: yield from. What that keyword allows us to do, is pass on any next(), send() and throw() into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from:

    >>> def inner():
    ...     inner_result = yield 2
    ...     print('inner', inner_result)
    ...     return 3
    ...
    >>> def outer():
    ...     yield 1
    ...     val = yield from inner()
    ...     print('outer', val)
    ...     yield 4
    ...
    >>> gen = outer()
    >>> next(gen)
    1
    >>> next(gen) # Goes inside inner() automatically
    2
    >>> gen.send("abc")
    inner abc
    outer 3
    4
    

    I've written an article to further elaborate on this topic.

    Putting it all together

    Upon introducing the new keyword yield from in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators - coroutines.

    Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the async def keyword. Much like generators, they too use their own form of yield from which is await. Before async and await were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from instead of await).

    async def inner():
        return 1
    
    async def outer():
        await inner()
    

    Like every iterator or generator that implement the __iter__() method, coroutines implement __await__() which allows them to continue on every time await coro is called.

    There's a nice sequence diagram inside the Python docs that you should check out.

    In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures.

    Futures

    Futures are objects that have the __await__() method implemented, and their job is to hold a certain state and result. The state can be one of the following:

    1. PENDING - future does not have any result or exception set.
    2. CANCELLED - future was cancelled using fut.cancel()
    3. FINISHED - future was finished, either by a result set using fut.set_result() or by an exception set using fut.set_exception()

    The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised.

    Another important feature of future objects, is that they contain a method called add_done_callback(). This method allows functions to be called as soon as the task is done - whether it raised an exception or finished.

    Tasks

    Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine awaits a future, the future is passed all the way back to the task (just like in yield from), and the task receives it.

    Next, the task binds itself to the future. It does so by calling add_done_callback() on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task's callback will be called, and it will rise back up to existence.

    Asyncio

    The final burning question we must answer is - how is the IO implemented?

    Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop's job is to call tasks every time they are ready and coordinate all that effort into one single working machine.

    The IO part of the event loop is built upon a single crucial function called select. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon receiving data it wakes up, and returns the sockets which received data, or the sockets which are ready for writing.

    When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its .send() buffer is full, or the .recv() buffer is empty, the socket is registered to the select function (by simply adding it to one of the lists, rlist for recv and wlist for send) and the appropriate function awaits a newly created future object, tied to that socket.

    When all available tasks are waiting for futures, the event loop calls select and waits. When the one of the sockets has incoming data, or its send buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.

    Now all the magic happens. The future is set to done, the task that added itself before with add_done_callback() rises up back to life, and calls .send() on the coroutine which resumes the inner-most coroutine (because of the await chain) and you read the newly received data from a nearby buffer it was spilled unto.

    Method chain again, in case of recv():

    1. select.select waits.
    2. A ready socket, with data is returned.
    3. Data from the socket is moved into a buffer.
    4. future.set_result() is called.
    5. Task that added itself with add_done_callback() is now woken up.
    6. Task calls .send() on the coroutine which goes all the way into the inner-most coroutine and wakes it up.
    7. Data is being read from the buffer and returned to our humble user.

    In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses yield from capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it's waiting for IO to complete (by using the OS select function).

    And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio.

  • 相关阅读:
    src和href属性的区别
    cookie, sessionStorage和localStorage的区别
    C# base64 转 byte[]
    C# 中字符串string和字节数组byte[]的转换
    C#获取当前路径的七种方法 【转载】
    C# 读取大文件 (可以读取3GB大小的txt文件)
    伪装文件到图片工具
    nmap命令扫描存活主机
    .deb文件如何安装,Ubuntu下deb安装方法图文详解
    kali修改更新源及更新
  • 原文地址:https://www.cnblogs.com/wztshine/p/14460847.html
Copyright © 2011-2022 走看看