zoukankan      html  css  js  c++  java
  • Asyncio 异步编程模块asyncio 总结

    协程语法

    在Python 3.5+发布之前,asyncio模块使用生成器模拟异步调用,因此具有与当前Python 3.5版本不同的语法 ;以下代码均基于python3.7

    从Python 3.5开始引入了异步async及await关键字。注意,在await func()调用时无需带上括号,先感受一下如下代码:

    import asyncio
    
    
    async def main():
    
        print(await func())
    
    
    async def func():
    
    
        # 这里可以写耗时较长的代码 
        return "Hello, world!"
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    事件循环EventLoop

    事件循环是asyncio的核心,异步任务的运行、任务完成之后的回调、网络IO操作、子进程的运行,都是通过事件循环完成的。在python3.7中,我们甚至完全不用管事件循环,只需要使用高层API,即asyncio中的方法,我们很少直接与事件循环打交道,但是为了更加熟悉asyncio的运行原理,最好还是了解EventLoop的设计原理。

    1、事件循环的创建、获取、设置

    (1)asyncio.get_running_loop()。python3.7新添加的

    (2)asyncio.get_event_loop()

    (3)asyncio.set_event_loop(loop)

    (4)asyncio.new_event_loop()

    2、运行和停止事件循环

    (1)loop.run_until_complete(future)。运行事件循环,直到future运行结束

    (2)loop.run_forever()。在python3.7中已经取消了,表示事件循环会一直运行,直到遇到stop。

    (3)loop.stop()。停止事件循环

    (4)loop.is_running()。如果事件循环依然在运行,则返回True

    (5)loop.is_closed()。如果事件循环已经close,则返回True

    (6)loop.close()。关闭事件循环

    3、创建Future和Task

    (1)loop.create_future(coroutine) ,返回future对象

    (2)loop.create_task(corootine) ,返回task对象

    (3)loop.set_task_factory(factory)

    (4)loop.get_task_factory()

    4、事件循环的时钟

    loop.time()。可以这么理解,事件循环内部也维护着一个时钟,可以查看事件循环现在运行的时间点是多少,就像普通的time.time()类似,它返回的是一个浮点数值,比如下面的代码。转自:

    接下来,在python云环境上运行下面的代码,实地体现一下异步程序调用:

    import asyncio
    
    
    async def cor1():
    
    
        print("cor1 start")
    
        for i in range(10):
            await asyncio.sleep(1.5)
            print("cor1", i)
    
    
    async def cor2():
    
    
        print("cor2 start")
    
        for i in range(15):
            await asyncio.sleep(1)
            print("cor2", i)
    
    loop = asyncio.get_event_loop()
    cors = asyncio.wait([cor1(), cor2()])
    loop.run_until_complete(cors)
    
    # 
     cor2 start
     cor1 start
     cor2 0
     cor1 0
     cor2 1
     cor1 1
     cor2 2
     cor2 3
     cor1 2
     cor2 4
     cor1 3
     cor2 5
     cor2 6
     cor1 4
     cor2 7
     cor1 5
     cor2 8
     cor2 9
     cor1 6
     cor2 10
     cor1 7
     cor2 11
     cor2 12
     cor1 8
     cor2 13
     cor1 9
     cor2 14

    一开始还是通过 asyncio.get_event_loop() 得到事件循环, 之后调用了 run_until_complete( 运行 loop ,等到 future 完成,run_until_complete 即返回 ) ,而调用的参数是一个 async 修饰过的函数的返回值

    异步执行

    asyncio 支持在异步调用任务中使用 Executor 对象 ,将Executor对象作为参数、可调用函数及其自身参数共同作为参数去调用run_in_executor() 函数来驱动异步执行事件

    Executor 调度任务:

    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    
    def func(a, b):
        #  做耗时的事情 
        return 'test '+a + b
    
    
    async def main(loop):
        executor = ThreadPoolExecutor() #初始化线程池执行器
        result = await loop.run_in_executor(executor, func, "Hello", " world!")
        print(result)
    
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loop))
    
    # 输出: test Hello world!
    

    每个事件循环还有一个“默认”执行程序槽,可以分配给执行程序。要分配执行程序并从循环中调度任务,可以使用set default Executor()方法

    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    
    def func(a, b):
        #  做耗时的事情 
        return 'test,'+a + b
    
    
    async def main(loop):
        #  注意:使用“None”作为第一个参数指定“默认”执行器 .
        result = await loop.run_in_executor(None, func, "Hello,", " world!")
        print(result)
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.set_default_executor(ThreadPoolExecutor())
        loop.run_until_complete(main(loop))
    
    #输出 test,Hello, world!

    parallel.futures中有两种主要的Executor类型,即ThreadPoolExecutor和
    ProcessPoolExecutor。 ThreadPoolExecutor包含一个线程池,可以手动将其设置为通过构造函数的特定线程数,或者默认为计算机上的内核数乘以5。


    ThreadPoolExecutor使用线程池执行分配给它的任务,并且通常在CPU绑定方面更好操作,而不用受I / O约束。与此形成对比的是ProcessPoolExecutor:


    ProcessPoolExecutor只能接受可修改的任务和参数。最常见的非picklable任务的对象的方法。如果必须将对象的方法调度为执行程序中的任务,则必须使用ThreadPoolExecutor

    UVLoop

    uvloop是异步的一种实现。基于libuv的AbstractEventLoop(由nodejs使用)。它兼容99%的异步特性,并且比传统的asyncio. eventloop快得多。uvloop目前在Windows上不可用,使用前先在python云环境上用命令 python -m pip install uvloop安装它

    import asyncio
    import uvloop
    
    if __name__ == "__main__":
        asyncio.set_event_loop(uvloop.new_event_loop())
    
        # 你的业务代码...
    
        

    也可以通过 EventLoopPolicy 在uvloop中设置事件循环工厂 :

    import asyncio
    import uvloop
    
    if __name__ == "__main__":
     asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
     loop = asyncio.new_event_loop()
    

    Event事件异步驱动模型

    概念

    可以使用event事件同步多个协程的调度

    简单地说,就像一场赛跑中,一场令下枪声响起,所有赛跑者根据枪声离开起跑线,开始并行的工作

    举例:

    import asyncio
    import functools
    
    
    # 事件触发函数
    def trigger(event):
        print('EVENT SET')
        event.set()
    
    
    # 唤醒协程等待
    # 事件消费者
    async def consumer_a(event):
        consumer_name = 'Consumer A'
        print('{} waiting'.format(consumer_name))
        await event.wait()
        print('{} triggered'.format(consumer_name))
    
    async def consumer_b(event):
        consumer_name = 'Consumer B'
        print('{} waiting'.format(consumer_name))
        await event.wait()
        print('{} triggered'.format(consumer_name))
    
    # 事件
    event = asyncio.Event()
    # 将 coroutines 放入 future
    main_future = asyncio.wait([consumer_a(event),
                                consumer_b(event)])
    # 事件循环
    event_loop = asyncio.get_event_loop()
    event_loop.call_later(3, functools.partial(trigger, event)) #使用偏函数封装(详见函数式编程https://blog.csdn.net/oSuiYing12/article/details/106211761)
    
    # 3秒后触发事件
    # 完成主future
    done, pending = event_loop.run_until_complete(main_future)
    
    #输出:
    
    Consumer B waiting
    Consumer A waiting
    #等待3秒后继续输出
    EVENT SET
    Consumer B triggered
    Consumer A triggered

    实现简单的Websocket

    在这里,我们使用asyncio创建一个简单的 websocket。我们定义了协程以连接到服务器,并发送/接收消息。网络套接字的通信在主协程中运行,该协程由
    事件循环event loop驱动:

    import asyncio
    import aiohttp  # 需要先在云环境中安装 python -m pip install aiohttp
    
    session = aiohttp.ClientSession()
    
    #  处理上下文管理器
    class EchoWebsocket:
    
        async def connect(self):
            self.websocket = await session.ws_connect("wss://echo.websocket.org")  #建立连接的websocket服务器地址,由于在国外可能访问不到
    
        async def send(self, message):
            self.websocket.send_str(message) #发送消息函数
    
    
        async def receive(self):
            result = (await self.websocket.receive()) #接收消息函数
            return result.data
    
    
    async def main():
        echo = EchoWebsocket()
        await echo.connect()
        await echo.send("Hello World!")
        print(await echo.receive()) # 正常情况下会返回打印出"Hello World!"
    
    if __name__ == '__main__':
        # The main loop
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
  • 相关阅读:
    Django
    C++开源库集合
    单细胞参考文献 single cell
    第三章 RNA测序
    第二章 基因芯片
    前言 转录组
    生物信息学——RNA的剪切过程
    生信研究内容
    测序总结,高通量测序名词
    单端测序,双端测序,基因组计划图谱
  • 原文地址:https://www.cnblogs.com/lincappu/p/13700766.html
Copyright © 2011-2022 走看看