zoukankan      html  css  js  c++  java
  • Asyncio之EventLoop笔记

    使用事件循环

    Python3.4 采用了一个强大的框架来支持代码的并发执行: asyncio。这个框架使用事件循环来编排回调和异步任务。
    事件循环位于事件循环策略的上下文中-这是 asyncio 所特有的概念。
    下图是协程,事件循环和策略之间的相互作用
    Coroutines, event loops, and policies
    协程可以被认为是可以在明确标记有某种语法元素的阶段“暂停”的函数.
    通过任务对象跟踪协程的状态,由相应的事件循环实例化。 事件循环跟踪当前正在运行的任务,并将 CPU 时间从空闲协程委派给待处理协议。在本章中,我们将更多地了解事件循环接口及其生命周期。将讨论事件循环策略-以及全局 asyncio API 对它们的影响。或者和其他异步工作单元(callbacks, promises/futures, and coroutines), 不同的事件循环,但是事件循环是区别于操作系统的。

    定位当前正在运行的循环

    存在问题

    由于各种原因,并发框架必须能够告诉您事件循环当前是否正在运行以及它是哪一个。例如,您的代码可能必须断言只有一个特定的循环实现正在运行您的任务。因此,只有一个任务可以改变某些共享资源或确保将调度您的回调

    解决方案

    使用全局 asyncio.get_event_loop 和 asyncio.get_running_loop 的 api。
    代码示例 1

    import asyncio
    loop = asyncio.get_event_loop()
    

    输出

    <_UnixSelectorEventLoop running=False closed=False debug=False>
    #windows 输出
    <_WindowsSelectorEventLoop running=False closed=False debug=False>
    

    代码示例 2

    import asyncio
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        print("No loop running")
    

    在 Python 3.7 中,有两种有效的方法来获取当前正在运行的循环实例。
    我们可以调用 asyncio.get_event_loop 或 asyncio.get_running_loop
    但 asyncio.get_event_loop 内部是做了什么?大概下面几点
    1.检查在调用函数时是否有循环运行
    2.返回其 pid 与当前进程 pid 匹配的运行循环(如果有)
    3.如果没有,获取存储在 asynci omodule 中的全局变量中的线程全局 LoopPolicy 实例。
    4.如果没有设置它,则使用锁用 DefaultLoopPolicy 实例化它。(_init_event_loop_policy 方法)
    5.注意,DefaultLoopPolicy 是依赖于操作系统的子类 BaseDefaultEventLoopPolicy,它提供了一个默认的循环实现。获取被调用的事件循环
    6.这是有个问题:仅在主线程上实例化循环并将其分配给线程局部变量时才会使用 loop_policy.get_event_loop 方法。
    如果你不在主线程上并且没有通过其他方式实例化运行循环,则会引发 RuntimeError

    这个过程有一些问题

    • get_event_loop 检查是否存在并返回当前运行的循环
    • 事件循环策略是全局存储线程,而循环实例是本地存储线程
    • 如果你在主线程上,get_event_loop 方法将实例化该循环并在策略中本地保存实例线程。
    • 如果你不在主线程上,它将引发 RuntimeError
      asyncio.get_running_loop 的工作方式不同。 如果有一个正在运行,它将始终返回当前正在运行的循环实例。 如果没有,则会引发 RuntimeError。

    创建一个新的循环实例

    存在问题

    由于 asyncio 中的循环与循环策略的概念紧密耦合,因此不建议通过循环构造函数创建循环实例。
    否则,我们可能会遇到范围问题,因为全局 asyncio.get_event_loop 函数只检索自己创建的循环或通过 asyncio.set_event_loop 设置的循环。

    解决方案

    要创建一个新的事件循环实例,我们将使用 asyncio.new_event_loop 的 API
    注意:此 api 不会更改当前安装的事件循环,但会初始化(asyncio)全局事件循环策略 - 如果之前未初始化的话。
    另一个问题是我们将新创建的循环附加到事件循环策略的观察程序,以确保我们的事件循环监视 UNIX 系统上新生成的子进程的终止

    import asyncio
    import sys
    loop = asyncio.new_event_loop()
    print(loop)  # Print the loop
    asyncio.set_event_loop(loop)
    if sys.platform != "win32":
        watcher = asyncio.get_child_watcher()
        watcher.attach_loop(loop)
    

    上面的代码怎么运行的呢
    如果从主线程调用,那么 asyncio.get_event_loop 应用程序接口仅实例化该循环
    下面是一个循环绑定到线程的例子

    import asyncio
    import threading
    from functools import partial
    
    
    def _worker(worker, *args, **kwargs):
        # 循环存在于循环策略的上下文中。DefaultLoopPolicy 对每个线程的循环进行限定,
        # 不允许通过 asyncio.get_event_loop 在主线程之外创建循环
        # 因此,我们必须通过 asyncio.set_event_loop(asyncio.new_event_loop())创建一个线程本地事件循环。
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            loop.run_until_complete(worker(*args, **kwargs))
        finally:
            loop.close()
    
    
    def create_event_loop_thread(worker, *args, **kwargs):
        return threading.Thread(target=partial(_worker, worker), args=args, kwargs=kwargs)
    
    
    async def print_coro(*args, **kwargs):
        print(f"Inside the print coro on {threading.get_ident()}:", (args, kwargs))
    
    
    def start_threads(*threads):
        [t.start() for t in threads if isinstance(t, threading.Thread)]
    
    
    def join_threads(*threads):
        [t.join() for t in threads if isinstance(t, threading.Thread)]
    
    
    def main():
        workers = [create_event_loop_thread(print_coro) for i in range(10)]
        start_threads(*workers)
        join_threads(*workers)
    
    
    if __name__ == '__main__':
        main()
    

    将循环附加到进程

    使用更高级的 multiprocessing 模块,我们可以构建一个跨平台的解决方案,在流程本地事件循环中运行多个协程。
    这样我们就可以规避 GIL 强加的 CPython 限制,并利用 asyncio 来提高 I/O 密集型任务的单核 CPU 使用率。

    ###协程附加到进程
    import asyncio
    import os
    import random
    import typing
    from multiprocessing import Process
    
    processes = []
    
    
    def cleanup():
        global processes
        while processes:
            proc = processes.pop()
            try:
                proc.join()
            except KeyboardInterrupt:
                # Ctrl+C 终止进程
                proc.terminate()
    
    
    async def worker():
        random_delay = random.randint(0, 3)
        result = await asyncio.sleep(random_delay, result=f"Working in process: {os.getpid()}")
        print(result)
    
    
    def process_main(coro_worker: typing.Callable, num_of_coroutines: int, ):
        """
        在单独的进程中运行多个协程的进程类。将在每个进程中运行的函数
        建议使用 asyncio.run 而不是实例化自己的事件循环。
         此示例仅用于说明如何在不同进程中实例化事件循环!
        :param coro_worker:
        :param num_of_coroutines:
        :return:
        """
        loop = asyncio.new_event_loop()
        try:
            workers = [coro_worker() for _ in range(num_of_coroutines)]
            loop.run_until_complete(asyncio.gather(*workers, loop=loop))
        except KeyboardInterrupt:
            print(f"Stoping {os.getpid()}")
            loop.stop()
        finally:
            loop.close()
    
    
    def main(processes, num_procs, num_coros, process_main):
        for _ in range(num_procs):
            proc = Process(target=process_main, args=(worker, num_coros))
            processes.append(proc)
            proc.start()
    
    
    if __name__ == '__main__':
        try:
            main(processes, 10, 2, process_main, )
        except KeyboardInterrupt:
            print("Ctrl+C 停止运行")
        finally:
            cleanup()
            print("CleanUp finished")
    
    

    此示例说明如何编写使用多处理的应用程序。

    运行异步代码而不用担心循环

    如果不想费心修改循环策略和清理异步生成器之后的代码(您将在下一章中了解它们),请使用以下代码。
    如果你只有一个线程和进程,并且只有一个协程需要从头到尾运行,这也很好。

    import asyncio
    async def main():    
        pass
    asyncio.run(main())
    

    在 Python3.6 你可以使用以下方法

    import asyncio
    
    
    async def main():
        pass
    
    
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        try:
            # 清理任何没有完全消耗的异步生成器。
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            loop.close()
    

    如果代码可能运行在线程中,需要使用下面的方式

    import asyncio
    import sys
    
    
    async def main():
        pass
    
    
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    if sys.platform != "win32":
        # 返回当前策略的当前子监视器。
        watcher = asyncio.get_child_watcher()
        # 给一个事件循环绑定监视器。
        # 如果监视器之前已绑定另一个事件循环,那么在绑定新循环前会先解绑原来的事件循环。
        watcher.attach_loop(loop)
        try:
            loop.run_forever()
        finally:
            try:
                loop.run_until_complete(loop.shutdown_asyncgens())
            finally:
                loop.close()
    
    

    判断是否只有一个事件循环

    import asyncio
    async def main(loop):
        assert loop == asyncio.get_running_loop()
        print("ok")
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    

    或者下面这种

    import asyncio
    
    async def main():
        pass
    
    loop = asyncio.get_event_loop()
    # 通过使用 loop.create_task API,可确保协程将在特定循环上运行。
    task = loop.create_task(main())
    task.add_done_callback(lambda fut: loop.stop())
    loop.run_forever()
    

    停止和关闭循环

    import asyncio
    import functools
    
    
    async def main(loop):
        print("Print in main")
    
    
    def stop_loop(fut, *, loop):
        loop.call_soon_threadsafe(loop.stop)
    
    
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(main(loop)) for _ in range(10)]
    # 为了能够正确地停止循环,我们需要确保已经消耗了所有任务,因此我们通过调用 asyncio.gather 来包装它们并向其 add_done_callback,这将关闭我们的循环。
    asyncio.gather(*tasks).add_done_callback(functools.partial(stop_loop, loop=loop))
    try:
        loop.run_forever()
    finally:
        try:
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            loop.close()
    

    添加循环信号处理程序

    我们通过 loop.add_signal_handler 添加一个新的信号处理程序。添加信号处理器。它类似于信号 API, 在这种情况下,我们决定在每个处理程序结束时停止循环。
    如果要为示例添加另一个处理程序,只需将信号名称添加到 SIGNAL_NAMES 以及以此方式命名的相应处理程序.

    import asyncio
    import functools
    import os
    import signal
    
    SIGNAL_NAMES = ('SIGINT', 'SIGTERM')
    SIGNAL_NAME_MESSAGE = " or ".join(SIGNAL_NAMES)
    
    
    def sigint_handler(signame, *, loop, ):
        print(f"Stopped loop because of {signame}")
        loop.stop()
    
    
    def sigterm_handler(signame, *, loop, ):
        print(f"Stopped loop because of {signame}")
        loop.stop()
    
    
    loop = asyncio.get_event_loop()
    
    for signame in SIGNAL_NAMES:
        loop.add_signal_handler(getattr(signal, signame),
                                functools.partial(locals()[f"{signame.lower()}_handler"], signame, loop=loop))
    
    print("Event loop running forever, press Ctrl+C to interrupt.")
    print(f"pid {os.getpid()}: send {SIGNAL_NAME_MESSAGE} to exit.")
    try:
        loop.run_forever()
    finally:
        loop.close()  # optional
    

    为什么不直接使用 signal API 在循环迭代过程中检查添加到循环中的信号处理程序呢?因为,当它关闭时,不可能向循环添加信号处理程序.另一个好处是,当循环关闭时,信号处理程序会为您清理。

    从循环生成子进程

    异步生成子流程并在单独的部分中有效地分割创建和状态管理是使用循环生成子流程的原因之一。
    下面的解决方案对于异步子流程 api 的大多数非交互式使用已经足够了。
    通过在 Windows 系统上设置适当的事件循环策略,它具有跨平台的优点。

    import asyncio
    import shutil
    import sys
    from typing import Tuple, Union
    
    
    async def invoke_command_async(*command, loop, encoding="UTF-8", decode=True) -> Tuple[
        Union[str, bytes], Union[str, bytes], int]:
        """
        Invoke a command asynchronously and return the stdout, stderr and the process return code.
        :param command:
        :param loop:
        :param encoding:
        :param decode:
        :return:
        """
        if sys.platform != 'win32':
            # 如果不是 windows 系统,防止有线程的使用
            asyncio.get_child_watcher().attach_loop(loop)
        process = await asyncio.create_subprocess_exec(*command,
                                                       stdout=asyncio.subprocess.PIPE,
                                                       stderr=asyncio.subprocess.PIPE,
                                                       loop=loop)
        out, err = await process.communicate()
    
        ret_code = process.returncode
    
        if not decode:
            return out, err, ret_code
    
        output_decoded, err_decoded = out.decode(encoding) if out else None, 
                                      err.decode(encoding) if err else None
    
        return output_decoded, err_decoded, ret_code
    
    
    async def main(loop):
        # shutil 返回路径 cmd 里可执行文件的路径。
    
        out, err, ret_code = await invoke_command_async(shutil.which("ping"), "-c", "1", "8.8.8.8", loop=loop)
        print(out, err, ret_code)
    
    
    if sys.platform == "win32":
        asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    

    等待子进程终止

    为了确保我们可以在 Windows 下等待子进程的终止,我们将轮询子进程以获得进程返回代码,该代码指示已终止的子进程。

    import asyncio
    
    # Quote from https://docs.python.org/3/library/asyncio-subprocess.html:
    # 在从其他线程执行子进程之前,必须在主线程中实例化子监视器
    # 调用主线程中的 get_child_watcher()函数来实例化子监视器
    import functools
    import shutil
    import sys
    
    if sys.platform == "win32":
        asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
    
    
    def stop_loop(*args, loop, **kwargs):
        loop.stop()
    
    
    async def is_windows_process_alive(process, delay=0.5):
        """
        On windows the signal API is very sparse, meaning we don't have SIGCHILD.
        So we just check if we have a return code on our process object.
        :param process:
        :param delay:
        :return:
        """
        while process.returncode is None:
            await asyncio.sleep(delay)
    
    
    async def main(process_coro, *, loop):
        process = await process_coro
        print(process)
        if sys.platform != "win32":
            child_watcher: asyncio.AbstractChildWatcher = asyncio.get_child_watcher()
            # 观察者连接到循环并方便地为我们调用 watcher.add_child_handler
            # 注册一个新的子处理回调函数。
    
            child_watcher.add_child_handler(process.pid, functools.partial(stop_loop, loop=loop))
        else:
            await is_windows_process_alive(process)
            loop.stop()
    
    
    loop = asyncio.get_event_loop()
    
    process_coro = asyncio.create_subprocess_exec(shutil.which("ping"), "-c", "1", "127.0.0.1",
                                                  stdout=asyncio.subprocess.DEVNULL,
                                                  stderr=asyncio.subprocess.DEVNULL)
    
    loop.create_task(main(process_coro, loop=loop))
    loop.run_forever()
    
  • 相关阅读:
    JDK 动态代理分析
    java class 文件解析
    ARM体系结构和汇编指令
    待整理
    Nand Flash,Nor Flash,CFI Flash,SPI Flash 之间的关系
    CPU与内存的那些事
    关于DMA和它的仇家
    BSS段 data段 text段 堆heap 和 栈stack
    关于常用序号的几点说明(数字序号顺序)
    word表格自动编号,前面加章节号
  • 原文地址:https://www.cnblogs.com/c-x-a/p/11022904.html
Copyright © 2011-2022 走看看