zoukankan      html  css  js  c++  java
  • python 多进程和多线程3 —— asyncio

    asyncio 被用作 提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

    asyncio 提供一组 高层级 API 用于:

    此外,还有一些 低层级 API 以支持 库和框架的开发者 实现:

    event_loop 事件循环:程序开启一个无限的循环,会把一些函数注册到事件循环上。当满足事件发生时,调用相应的协程函数。

    • task:Future子类,对协程进一步封装,其中包含任务的各种状态。

    • future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别

    • async/await 关键字:async定义一个协程,await用于挂起阻塞的异步调用接口。

    asyncio.create_task和loop.create_task以及asyncio.ensure_future

    • asyncio.create_task就是用的loop.create_task. 而loop.create_task接受的参数需要是一个协程
    • asyncio.ensure_future除了接受协程,还可以是Future对象或者awaitable对象:

      1. 如果参数是协程,其底层使用loop.create_task,返回Task对象

      2. 如果是Future对象会直接返回

      3. 如果是一个awaitable对象,会await这个对象的__await__方法,再执行一次ensure_future,最后返回Task或者Future。

    asyncio.gather和asyncio.wait

    1. asyncio.gather能收集协程的结果,且会按照输入协程的顺序保存对应协程的执行结果,而asyncio.wait的返回值有两项,第一项是完成的任务列表,第二项表示等待完成的任务列表。
    2. asyncio.wait 支持接受一个参数return_when,默认情况下asyncio.wait会等待全部任务完成(return_when='ALL_COMPLETED'),它还支持FIRST_COMPLETED(第一个协程完成就返回)和FIRST_EXCEPTION(出现第一个异常就返回):
    # 定义协程coroutine:调用不会立即执行,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
    async def my_task(x):
        print('Waiting: ', x)
        return "Done"
     
    # 消息循环: 从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现异步IO。
    loop = asyncio.get_event_loop()       # new_event_loop
    loop.run_until_complete(my_task(2))   # 堵塞直到所有tasks完成
     
     
    # 创建task
    task = loop.create_task(my_task(2))   # 或 task = asyncio.ensure_future(my_task(2))
    loop.run_until_complete(task)         # 运行任务直到Future完成并返回它的结果,task.result()协程返回值
     
     
    # 绑定回调
    def callback(future):
        print('Callback: ', future.result())
    task.add_done_callback(callback)
    loop.run_until_complete(task)      # coroutine执行结束时会调用回调函数,并通过参数future获取协程执行的结果, 创建的task和回调里的future对象,实际上是同一个对象。
     
     
    # 在函数定义时用async def foo()代替 @gen.coroutine 装饰器, 用 await 代替yield. 
    # 协程遇到await,事件循环会将其挂起,执行别的协程,直到其他协程也挂起或执行完毕,再进行下一个协程的执行。
    async def my_task(x):
        print('Waiting: ', x)
        await asyncio.sleep(x)
        return "Done"
     
     
    # 并发: 不同代码块交替执行的性能,交替执行。
    # 并行: 不同代码块同时执行的性能,同时执行。
    tasks = [
        asyncio.ensure_future(my_task(1)),
        asyncio.ensure_future(my_task(2)),
        asyncio.ensure_future(my_task(3))
    ]
    # tasks = [my_task(i) for i in range(1,4)]
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        print(asyncio.Task.all_tasks())
        for task in asyncio.Task.all_tasks():   # 循环task,逐个cancel; 或用 asyncio.gather(*asyncio.Task.all_tasks()).cancel()
            print(task.cancel())
        loop.stop()
        loop.run_forever()   # loop stop后还需要再次开启事件循环,最后再close,不然还会抛出异常!!
    finally:
        loop.close()
    for task in tasks:
        print('Task ret: ', task.result())
     
     
    # ====================================================================
    # 协程嵌套1: asyncio.wait 将协程列表包装成Task(Future子类)并等待其执行完成
    async def main():
        tasks = [
            asyncio.ensure_future(my_task(1)),
            asyncio.ensure_future(my_task(2)),
            asyncio.ensure_future(my_task(3))
        ]
        dones, pendings = await asyncio.wait(tasks)
        for task in dones:
            print('Task ret: ', task.result())
     
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
     
     
    # 协程嵌套2: asyncio.gather支持多个协程交给loop
    async def main():
        tasks = [
            asyncio.ensure_future(my_task(1)),
            asyncio.ensure_future(my_task(2)),
            asyncio.ensure_future(my_task(3))
        ]
        return await asyncio.gather(*tasks)   # 把多个futures包装成单个future
     
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main())
     
     
    # 协程嵌套3: asyncio.as_completed
    async def main():
        for task in asyncio.as_completed(tasks):
            result = await task
            print('Task ret: {}'.format(result))
    done = loop.run_until_complete(main())
     
     
    # ====================================================================
    def start_loop(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    def more_work(x):
        print('More work {}'.format(x))
        time.sleep(x)   # 同步阻塞
        print('Finished more work {}'.format(x))
     
     
    # 动态添加协程到事件循环: 多线程
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()
    new_loop.call_soon_threadsafe(more_work, 6)
    new_loop.call_soon_threadsafe(more_work, 3)
     
     
    # 子线程中进行事件循环的并发操作,同时主线程又不会被block
    asyncio.run_coroutine_threadsafe(my_task(6), new_loop)
    asyncio.run_coroutine_threadsafe(my_task(4), new_loop)
     
     
    # master-worker主从模式: 主线程用来监听队列,子线程用于处理队列
    t.setDaemon(True)    # 设置子线程为守护线程, 当主线程结束的时候,子线程也随机退出。
    try:
        while True:
            task = rcon.rpop("queue")
            if not task:
                time.sleep(1)
                continue
            asyncio.run_coroutine_threadsafe(my_task(int(task)), new_loop)
    except KeyboardInterrupt as e:
        print(e)
        new_loop.stop()
     
     
    loop.run_until_complete(future):协程()、task、asyncio.wait(tasks)、asyncio.gather(
    *tasks) # 阻塞调用,直到协程运行结束,它才返回。 loop.run_forever() # 一直运行,直到loop.stop被调用. 但不能直接调用,可用gather把多个协程合并成一个future,并添加回调,然后在回调里再去停止 loop。 @asyncio.coroutine把一个generator标记为coroutine类型,然后就把这个coroutine扔到EventLoop中执行。 asyncio.sleep() # 协程,可模拟IO操作如网络请求,文件读取等

    参考: https://www.jianshu.com/p/b5e347b3a17c

  • 相关阅读:
    【Linux软件安装】
    Java IO(七)ByteArrayInputStream 和 ByteArrayOutputStream
    Java IO(六) ObjectInputStream 和 ObjectOutputStream
    Java IO(四) InputStream 和 OutputStream
    Java IO(五)字节流 FileInputStream 和 FileOutputStream
    Java IO(三)FileDescriptor
    Java IO(二)File
    Java IO(一)概述
    Java中的集合(十五) Iterator 和 ListIterator、Enumeration
    Java中的自动装箱拆箱
  • 原文地址:https://www.cnblogs.com/bsszds930/p/12956781.html
Copyright © 2011-2022 走看看