zoukankan      html  css  js  c++  java
  • 异步编程

    异步编程

    一、事件循环

    我们可以理解为就是一个 循环 并去检测并执行一些代码。

    # 伪代码
    
    
    任务列表 = [ {'任务1':可执行},{'任务2':IO阻塞},{"任务3":"已完成"}] 
    
    
    while True:
    
        可执行任务列表 = [] 去任务列表检测所有任务, 拿到可执行任务 
        已完成任务列表 = [] 去任务列表检测所有任务,已完成的任务返回
        
        for 就绪任务 in 可执行任务列表:
            执行已就绪任务
        for 已完成任务 in 已完成任务列表:
            任务列表中 移除已完成任务
        if 任务list 中的任务都已经完成 结束循环
            pass
    

    事件循环 我们可以看这段伪代码 去理解 就够了。

    # 去生成一个任务循环
    loop = asyncio.get_event_loop()
    # 将任务放到 "任务list"
    loop.run_until_complete(asyncio.wait(tasks))
    

    二、快速上手

    协程函数、定义函数的时候async def 函数名
    协程对象、协程函数() 得到协程对象 但是内部代码把 不会执行

    async def func():
        pass
    ret = func()
    

    注意⚠️ 执行协程函数创建协程对象,函数内部不会执行

    如果想要运行协程函数内部代码 ,必须要交给事件循环

    import asyncio
    async def func(): 
        print("我执行了")
        
    ret = func()
    
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(ret)
    
    # python3.5 之后 可以这样写
    
    asyncio.run( ret ) 
    

    三、await关键字

    await 字面意识等待的意识 await 后面一定要跟可等待的对象(协程对象 ,Future ,Task对象)>> io 等待

    示例1:

    import asyncio
    
    async def func():
        print("进入程序")
        response = await  asyncio.sleep(2)
        print("程序结束",response)
    
    ret = func()
    
    asyncio.run(ret)
    

    示例2:

    import asyncio
    
    async def func():
        print("开始")
        response = await  asyncio.sleep(2)
        print("结束",response)
    
    
    
    async def func1():
    
        print("进入程序")
    
        # 遇到 IO 操作挂起当前协程任务 ,等IO操作 完成之后再继续往下执行。当前协程挂起,事件循环可去执行其他协程任务
    
        await func()
    
        print("程序结束")
    
    ret = func1()
    
    asyncio.run(ret)
    

    await 就是等待对象的值得到结果之后在往下走

    注意⚠️: 有的人会感觉没什么有 感觉就是同步执行 一直在等

    四、task对象

    官方定义

    Tasks are used to schedule coroutines concurrently.
    When a coroutine is wrapped into a Task with functions likeasyncio.create_task() the coroutine is automatically scheduled to run soon。

    Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task()ensure_future() 函数。不建议手动实例化 Task 对象。

    本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。

    注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

    import asyncio
    
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值"
    
    
    async def main():
        print("main开始")
    
        # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
        task1 = asyncio.create_task(func())
    
        # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
        task2 = asyncio.create_task(func())
    
        print("main结束")
    
        # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
        # 此处的await是等待相对应的协程全都执行完毕并获取结果
        ret1 = await task1
        ret2 = await task2
        print(ret1, ret2)
    
    
    asyncio.run(main())
    

    示例2:

    import asyncio
    
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值"
    
    
    async def main():
        print("main开始")
    
        # 创建协程,将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
        # 在调用
        task_list = [
            asyncio.create_task(func(), name="n1"),
            asyncio.create_task(func(), name="n2")
        ]
    
        print("main结束")
    
        # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
        # 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
        # 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中。
        done, pending = await asyncio.wait(task_list, timeout=None)
        print(done, pending)
    
    
    asyncio.run(main())
    

    注意:asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,所以在和wait配合使用时task_list的值为[func(),func()] 也是可以的。

    示例:

    import asyncio
    
    
    async def func():
        print("执行协程函数内部代码")
    
        # 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
        response = await asyncio.sleep(2)
    
        print("IO请求结束,结果为:", response)
        return "返回值"
    
    
    coroutine_list = [func(), func()]
    
    # 错误:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
    # 此处不能直接 asyncio.create_task,因为将Task立即加入到事件循环的任务列表,
    # 但此时事件循环还未创建,所以会报错。
    
    
    # 使用asyncio.wait将列表封装为一个协程,并调用asyncio.run实现执行两个协程
    # asyncio.wait内部会对列表中的每个协程执行ensure_future,封装为Task对象。
    done,pending = asyncio.run( asyncio.wait(coroutine_list) )
    
    for ret in done:
        print(ret.result())
    

    结果:

    执行协程函数内部代码
    执行协程函数内部代码
    IO请求结束,结果为: None
    IO请求结束,结果为: None
    返回值
    返回值
    

    五、 asyncio.Future对象

    A Futureis a special low-level awaitable object that represents an eventual result of an asynchronous operation.

    asyncio中的Future对象是一个相对更偏向底层的可对象,通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪。( Task 是 Futrue的子类 )

    Future为我们提供了异步编程中的 最终结果 的处理(Task类也具备状态处理的功能)。
    示例:

    import asyncio
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
    
        # # 创建一个任务(Future对象),这个任务什么都不干。
        fut = loop.create_future()
    
        # 等待任务最终结果(Future对象),没有结果则会一直等下去。
        await fut
    
    asyncio.run(main())
    

    示例2:

    import asyncio
    
    async  def func(fut):
        await asyncio.sleep(2)
        fut.set_result("任务")
    
    async def func1():
        # 获取事件循环
        loop = asyncio.get_event_loop()
        # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
        fut = loop.create_future()
    
        # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
        # 即手动设置future任务的最终结果,那么fut就可以结束了。
        
        loop.create_task(func(fut=fut))
        # 等待 Future对象获取 最终结果,否则一直等下去
        return await fut
    
    
    
    ret = asyncio.run(func1())
    print(ret)
    

    Future对象本身函数进行绑定,所以想要让事件循环获取Future的结果,则需要手动设置。而Task对象继承了Future对象,其实就对Future进行扩展,他可以实现在对应绑定的函数执行完成之后,自动执行set_result,从而实现自动结束。

    虽然,平时使用的是Task对象,但对于结果的处理本质是基于Future对象来实现的。

    扩展:支持 await 对象语 法的对象课成为可等待对象,所以 协程对象Task对象Future对象 都可以被成为可等待对象

    六、futures.Future对象

    在Python的concurrent.futures模块中也有一个Future对象,这个对象是基于线程池和进程池实现异步操作时使用的对象。

    import time
    from concurrent.futures import Future
    from concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures.process import ProcessPoolExecutor
    
    
    def func(value):
        time.sleep(1)
        print(value)
    
    
    pool = ThreadPoolExecutor(max_workers=5)
    # 或 pool = ProcessPoolExecutor(max_workers=5)
    
    
    for i in range(10):
        fut = pool.submit(func, i)
        print(fut)
    

    两个Future对象是不同的,他们是为不同的应用场景而设计,例如:concurrent.futures.Future不支持await语法 等。

    在Python提供了一个将futures.Future 对象包装成asyncio.Future对象的函数 asynic.wrap_future

    接下里你肯定问:为什么python会提供这种功能?

    其实,一般在程序开发中我们要么统一使用 asycio 的协程实现异步操作、要么都使用进程池和线程池实现异步操作。但如果 协程的异步进程池/线程池的异步 混搭时,那么就会用到此功能了。

    import time
    import asyncio
    import concurrent.futures
    
    def func1():
        # 某个耗时操作
        time.sleep(2)
        return "SB"
    
    async def main():
        loop = asyncio.get_running_loop()
    
        # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
        # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
        # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
        # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
        fut = loop.run_in_executor(None, func1)
        result = await fut
        print('default thread pool', result)
    
        # 2. Run in a custom thread pool:
        # with concurrent.futures.ThreadPoolExecutor() as pool:
        #     result = await loop.run_in_executor(
        #         pool, func1)
        #     print('custom thread pool', result)
    
        # 3. Run in a custom process pool:
        # with concurrent.futures.ProcessPoolExecutor() as pool:
        #     result = await loop.run_in_executor(
        #         pool, func1)
        #     print('custom process pool', result)
    
    asyncio.run(main())
    

    应用场景:当项目以协程式的异步编程开发时,如果要使用一个第三方模块,而第三方模块不支持协程方式异步编程时,就需要用到这个功能,例如:

    import asyncio
    import requests
    
    
    async def download_image(url):
        # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
        print("开始下载:", url)
    
        loop = asyncio.get_event_loop()
        # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
        future = loop.run_in_executor(None, requests.get, url)
    
        response = await future
        print('下载完成')
        # 图片保存到本地文件
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(response.content)
    
    
    if __name__ == '__main__':
        url_list = [
            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
        ]
    
        tasks = [download_image(url) for url in url_list]
    
        loop = asyncio.get_event_loop()
        loop.run_until_complete( asyncio.wait(tasks) )
    

    七、异步迭代器

    什么是异步迭代器

    实现了 __aiter__()__anext__() 方法的对象。__anext__ 必须返回一个 awaitable 对象。async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

    什么是异步可迭代对象?

    可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。

    import asyncio
    
    
    class Reader(object):
        """ 自定义异步迭代器(同时也是异步可迭代对象) """
    
        def __init__(self):
            self.count = 0
    
        async def readline(self):
            await asyncio.sleep(1)
            self.count += 1
            if self.count == 100:
                return None
            return self.count
    
        def __aiter__(self):
            return self
    
        async def __anext__(self):
            val = await self.readline()
            if val == None:
                raise StopAsyncIteration
            return val
    
    
    async def func():
        # 创建异步可迭代对象
        async_iter = Reader()
    
        # async for 必须要放在async def函数内,否则语法错误。
        async for item in async_iter:
            print(item)
    
    asyncio.run(func())
    

    异步迭代器其实没什么太大的作用,只是支持了async for语法而已

    八、异步上下文管理器

    此种对象通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。

    import asyncio
    
    
    class AsyncContextManager:
    	def __init__(self):
            self.conn = conn
            
        async def do_something(self):
            # 异步操作数据库
            return 666
    
        async def __aenter__(self):
            # 异步链接数据库
            self.conn = await asyncio.sleep(1)
            return self
    
        async def __aexit__(self, exc_type, exc, tb):
            # 异步关闭数据库链接
    		await asyncio.sleep(1)
    
    
    async def func():
        async with AsyncContextManager() as f:
            result = await f.do_something()
            print(result)
    
    
    asyncio.run(func())
    

    这个异步的上下文管理器还是比较有用的,平时在开发过程中 打开、处理、关闭 操作时,就可以用这种方式来处理。

    总结

    在程序中只要看到asyncawait关键字,其内部就是基于协程实现的异步编程,这种异步编程是通过一个线程在IO等待时间去执行其他任务,从而实现并发。

    以上就是异步编程的常见操作,内容参考官方文档。

  • 相关阅读:
    java 爬虫 爬取豆瓣 请不要害羞 图片
    Intellij idea 基本配置
    Linux 基本操作
    Java 快速排序
    Codeforces 986D Perfect Encoding FFT
    Codeforces 749E Gosha is hunting 二分+DP
    BZOJ5305 [Haoi2018]苹果树
    Codeforces 666E Forensic Examination SAM+权值线段树
    BZOJ3712[PA2014]Fiolki 建图+倍增lca
    Codeforces 576D Flights for Regular Customers 矩阵快速幂+DP
  • 原文地址:https://www.cnblogs.com/jiangchunsheng/p/12973361.html
Copyright © 2011-2022 走看看