zoukankan      html  css  js  c++  java
  • 协程&异步编程&asyncio

    协程&异步编程&asyncio

    1.项目示例

    1.1在python3.7 之前的版本

    主要为

    asyncio.ensure_future() 实例化task

    使用

    loop = asyncio.get_event_loop()

    done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))

    来运行

    import asyncio
    
    async def run():
        print("hello")
        await asyncio.sleep(5)
        print("world")
    
    async def run1():
        print("hello")
        await asyncio.sleep(3)
        print("world")
    
    # 创建并发
    tasks = [asyncio.ensure_future(run()), asyncio.ensure_future(run1())]
    # 创建应用池
    loop = asyncio.get_event_loop()
    # done 完成的
    # deping 未完成的
    # timeout 最长等带时间
    done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))
    print("done:", done)
    print("deping:", deping)
    
    hello
    hello
    world
    done: {<Task finished name='Task-2' coro=<run1() done, defined at c:UsersAdministratorDesktopexam协程旧的版本.py:8> result=None>}
    deping: {<Task pending name='Task-1' coro=<run() running at c:UsersAdministratorDesktopexam协程旧的版本.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001EF104A8040>()]>>}
    

    1.2.在python3.7 之后

    将 手动创建 task 和 运行方式进行了修改

    使用: async.create_task() 来创建task 实例

    使用 : asyncio.run() 来运行

    示例:

    import asyncio
    
    async def run():
        print("hello")
        await asyncio.sleep(2)
        print("world")
        return 1
    
    async def run1():
        print("hello")
        await asyncio.sleep(5)
        print("world")
        
    # 主函数
    async def main():
        # 创建task, name 任务名称
        tasks = [ asyncio.create_task(run(), name="n1"), 
                 asyncio.create_task(run(), name="n2")]
        # done 已完成的
        # pending 未完成的
     	# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
        done, pending =  await asyncio.wait(tasks, timeout=None)
        return done, pending
    
    # 运行
    done, pending = asyncio.run( main() )
    print("done:", done)
    print("pending:", pending)
    
    C:UsersAdministratorDesktopexam>C:/Python39/python.exe "c:/Users/Administrator/Desktop/exam/协程/1. 简单示例3.4后.py"
    hello
    hello
    world
    world
    done: {<Task finished name='n1' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1. 简单示例3.4后.py:3> result=1>, <Task finished name='n2' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1.  
    简单示例3.4后.py:3> result=1>}
    pending: set()
    

    2.await ----->等待

    ​ await + 可等待对象 (协程对象, Future 对象, Task 对象) ---> IO 等待

    携程对象: ----> async 修饰的函数 + ()

    ​ 流程:

    ​ 如果有其他任务, 在遇到IO等待(await) 的时候去执行其他任务。

    示例1:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    asyncio.run(other())
    
    

    hello
    world

    示例2:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    
    
    async def func():
        print("执行协程函数内部代码")
        #respone = await func()
        task = asyncio.create_task(other())
        print("执行结束")
    
    asyncio.run(func())
    

    结果:

    执行协程函数内部代码
    执行结束
    hello

    在这里我们发现, 我们开始执行协程内部代码遇到内部的await 并没有等,而是直接结束了,

    示例3:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    
    
    async def func():
        print("执行协程函数内部代码")
        respone = await other()
        print("执行结束")
    
    asyncio.run(func())
    

    执行协程函数内部代码
    hello
    world
    执行结束

    我们发现这样只有等内部结束后才会执行,外部接下来的代码

    示例4:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    
    async def other2():
        print("other---->开始")
        await asyncio.sleep(5)
        print("other ----> 结束")
    
    async def func():
        print("执行协程函数内部代码")
        respone = await other()
        respone2 = await other2()
        print("执行结束")
    
    asyncio.run(func())
    

    执行协程函数内部代码
    hello
    world
    other---->开始
    other ----> 结束
    执行结束

    同样, await 结束后才执行下一个

    3.task

    ​ 在事件循环中并发的添加多个任务的。可以说解决了上面的 await 的问题。

    1. 使用asyncio.create_task()  创建 3.7 之后, 建议
    2. loop.create_task() 或者 asyncio.ensure_future() 来创建
    
    async def run():
        print("hello")
        await asyncio.sleep(2)
        print("world")
        return 1
    
    async def run1():
        print("hello")
        await asyncio.sleep(5)
        print("world")
        
        
    tasks = [run(), run1()]
    done, pending = asyncio.run( asyncio.wait(tasks) )
    print("done:", done)
    print("pending:", pending)
    
    import asyncio
    
    async def run():
        print("hello")
        await asyncio.sleep(2)
        print("world")
        return 1
    
    async def run1():
        print("hello")
        await asyncio.sleep(5)
        print("world")
        
    # 主函数
    async def main():
        # 创建task, name 任务名称
        tasks = [ asyncio.create_task(run(), name="n1"), 
                 asyncio.create_task(run1(), name="n2")]
        # 或者
        # tasks = [
        #    run(),
        #    run1()
        # ]
        # done 已完成的
        # pending 未完成的
     	# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
        done, pending =  await asyncio.wait(tasks, timeout=None)
        return done, pending
    
    # 运行
    done, pending = asyncio.run( main() )
    print("done:", done)
    print("pending:", pending)
    

    4. asyncio.Future 对象

    他是 task 的基类,一个更低级的接口帮助我们等待task的结果

    Task 继承 Future, Task 对象内部 await 结果的处理基于Future 对象来的。

    示例1:

    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
        # 创建一个任务(Future对象), 这个任务什么都不干
        fut = loop.create_future()
        # 这个测试中这里始终等不到对象
        await fut
    asyncio.run(main())
    

    示例2:

    import asyncio
    
    async def set_after(fut):
        await asyncio.sleep(2)
        # 给fut 设置结果
        fut.set_resukt("666")
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
        # 创建一个任务(Future对象), 这个任务什么都不干
        fut = loop.create_future()
        await loop.create_task(set_after(fut))
        # 这里会等到set_after 为fut设置结果, 等待结束
        data = await fut
        print(data)
    asyncio.run(main())
    

    5. concurrent.futures.Future 对象

    使用线程池、进程池实现异步操作时用到的对象。

    帮助我们来获取结果

    https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

    示例1:

    import asyncio
    import time
    import concurrent.futures
    
    def blocking_io():
        # File operations (such as logging) can block the
        # event loop: run them in a thread pool.
        print("hello io")
        time.sleep(5) # 这时候这里可以是任何需要等待结果的操作
        print("world io")
        return "1111"
    
    def cpu_bound():
        # CPU-bound operations will block the event loop:
        # in general it is preferable to run them in a
        # process pool.
        print("hello cpu")
        time.sleep(5)
        print("world cpu")
        return "2222"
    
    async def f(fun):
        # 重点, 以线程池或进程池的方式启动
        loop = asyncio.get_event_loop()
        # run_in_executor 的方式启动,则可变成可等待对象。
        result =  loop.run_in_executor(
            None, fun)
        # 等待结果
        response = await result
        print(response)
    
    async def main():
        task = [ f(blocking_io), f(cpu_bound)]
        await asyncio.wait(task)
    
    asyncio.run( main())
    
    
    # 注
    # get_event_loop  只会在主线程帮您创建新的 event loop
    # get_running_loop 用于获取当前正在运行的loop,如果当前主线程中没有正在运行的loop,如果没有就会报RuntimeError 错误。
    

    本来这种方式是不支持 async, 目前的实现结合了async 实际上是通过 线程池来实现的

    loop.run_in_executor( None, fun)

    第一个参数传None 使用默认, 在上边连接官网中有介绍

    同时也可以把 asyncio.run(main()) 改成

    task = [ f(blocking_io), f(cpu_bound)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(task))
    

    6.异步上下文管理器

    此种对象通过定义 __aenter__()__aexit__()方法来对 async_with 语句中的环境进行控制。

    import asyncio
    
    class AsyncContentManager:
        def __init__(self):
            pass
        
        async def do_something(self):
            return 666
        
        async def __aenter__(self):
            # 连接数据库   
            return self
        
        async def __aexit__(self, exc_type, exc, tb):
            # 异步关闭数据库
            await asyncio.sleep(1)
    
    async def func():
        async with AsyncContentManager() as f:
            result = await f.do_something()
    asyncio.run( func() )
    

    7. uvloop

    ​ 是asyncio的事件循环的替代方案 > 默认asyncio的事件循环。

    pip install uvloop	
    
    import asyncio
    import uvloop
    
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    # 编写asyncio的代码,与之前写的代码一致
    # 内部的事件循环自动化会变成uvloop
    asyncio.run()
    
    

    8. 实操

    ​ 8.1 异步redis

    ​ 在python 操作redis, 连接/操作/断开都是网络IO

    pip install aioredis
    

    示例1:

    import asyncio
    import aioredis
    
    async def execute(address, passwd, test):
        """
        	@address: redis 连接地址
        	@passwd: redis 密码
        """
        print("开始执行", test)
        
        redis = await aioredis.create_redis_pool(address=address, password=passwd)
        # 网络io操作
        await redis.hmset_dict("car", key1=1, key2=2, key3=3)
        
        # 网络io操作、遇到io自动切换任务
        result = await redis.hgetall("car", encoding="utf-8")
        print(result)
        
        redis.close()
        #  网络io操作, 遇到io会自动切换任务
        await redis.wait_closed()
        print("结束", test)
    
    tasks = [
        execute("redis://127.0.0.1:6379", "295213", "测试1"),
        execute("redis://127.0.0.1:6379", "295213", "测试2")
    ]
    
    asyncio.run(asyncio.wait(tasks))
    

    结果

    开始执行 测试2
    开始执行 测试1
    {'key1': '1', 'key2': '2', 'key3': '3'}
    {'key1': '1', 'key2': '2', 'key3': '3'}
    结束 测试2
    结束 测试1
    
    

    8.2 异步mysql

    pip install aiomysql
    

    示例1:

    import asyncio
    import aiomysql
    
    async def execute(host, password, test):
        print("开始", test)
        # 建立连接
        conn = await aiomysql.connect(host=host, port=3306, user="root", password=password, db="work")
        # 创建指针
        cur = await conn.cursor()
        
        # 查询
        await cur.execute("select * from users limit 1")
        data = await cur.fetchall()
        print(data)
        await cur.close()
        conn.close()
        print("结束", test)
        
    tasks = [
        execute("127.0.0.1", "295213", "测试1"),
        execute("xxx.xx.xx.xx", "295213", "测试2")
    ]
    
    asyncio.run(asyncio.wait(tasks))
    
  • 相关阅读:
    git 通过 fork 解决代码冲突
    vue-cli3.0 多页面和单页面的配置以及相互之间的切换
    关于切换环境变量的问题( 以vue-cli3.0 为例)
    vue-router 钩子
    Android eMMC 分区详解(转载)
    《PE总结 》– 重定位表(转载)
    Linux 文件系统
    爬虫登录,立FLAG
    ios tweak 开发
    ios app 砸壳
  • 原文地址:https://www.cnblogs.com/ShanCe/p/14550146.html
Copyright © 2011-2022 走看看