zoukankan      html  css  js  c++  java
  • 协程&异步编程&asyncio

    协程&异步编程&asyncio

    1.项目示例

    1.1在python3.7 之前的版本

    主要为

    asyncio.ensure_future() 实例化task

    使用

    loop = asyncio.get_event_loop()

    done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))

    来运行

    import asyncio
    
    async def run():
        print("hello")
        await asyncio.sleep(5)
        print("world")
    
    async def run1():
        print("hello")
        await asyncio.sleep(3)
        print("world")
    
    # 创建并发
    tasks = [asyncio.ensure_future(run()), asyncio.ensure_future(run1())]
    # 创建应用池
    loop = asyncio.get_event_loop()
    # done 完成的
    # deping 未完成的
    # timeout 最长等带时间
    done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))
    print("done:", done)
    print("deping:", deping)
    
    hello
    hello
    world
    done: {<Task finished name='Task-2' coro=<run1() done, defined at c:UsersAdministratorDesktopexam协程旧的版本.py:8> result=None>}
    deping: {<Task pending name='Task-1' coro=<run() running at c:UsersAdministratorDesktopexam协程旧的版本.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001EF104A8040>()]>>}
    

    1.2.在python3.7 之后

    将 手动创建 task 和 运行方式进行了修改

    使用: async.create_task() 来创建task 实例

    使用 : asyncio.run() 来运行

    示例:

    import asyncio
    
    async def run():
        print("hello")
        await asyncio.sleep(2)
        print("world")
        return 1
    
    async def run1():
        print("hello")
        await asyncio.sleep(5)
        print("world")
        
    # 主函数
    async def main():
        # 创建task, name 任务名称
        tasks = [ asyncio.create_task(run(), name="n1"), 
                 asyncio.create_task(run(), name="n2")]
        # done 已完成的
        # pending 未完成的
     	# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
        done, pending =  await asyncio.wait(tasks, timeout=None)
        return done, pending
    
    # 运行
    done, pending = asyncio.run( main() )
    print("done:", done)
    print("pending:", pending)
    
    C:UsersAdministratorDesktopexam>C:/Python39/python.exe "c:/Users/Administrator/Desktop/exam/协程/1. 简单示例3.4后.py"
    hello
    hello
    world
    world
    done: {<Task finished name='n1' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1. 简单示例3.4后.py:3> result=1>, <Task finished name='n2' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1.  
    简单示例3.4后.py:3> result=1>}
    pending: set()
    

    2.await ----->等待

    ​ await + 可等待对象 (协程对象, Future 对象, Task 对象) ---> IO 等待

    携程对象: ----> async 修饰的函数 + ()

    ​ 流程:

    ​ 如果有其他任务, 在遇到IO等待(await) 的时候去执行其他任务。

    示例1:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    asyncio.run(other())
    
    

    hello
    world

    示例2:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    
    
    async def func():
        print("执行协程函数内部代码")
        #respone = await func()
        task = asyncio.create_task(other())
        print("执行结束")
    
    asyncio.run(func())
    

    结果:

    执行协程函数内部代码
    执行结束
    hello

    在这里我们发现, 我们开始执行协程内部代码遇到内部的await 并没有等,而是直接结束了,

    示例3:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    
    
    async def func():
        print("执行协程函数内部代码")
        respone = await other()
        print("执行结束")
    
    asyncio.run(func())
    

    执行协程函数内部代码
    hello
    world
    执行结束

    我们发现这样只有等内部结束后才会执行,外部接下来的代码

    示例4:

    import asyncio
    
    async def other():
        print("hello")
        await asyncio.sleep(3)
        print("world")
        return "返回值"
    
    async def other2():
        print("other---->开始")
        await asyncio.sleep(5)
        print("other ----> 结束")
    
    async def func():
        print("执行协程函数内部代码")
        respone = await other()
        respone2 = await other2()
        print("执行结束")
    
    asyncio.run(func())
    

    执行协程函数内部代码
    hello
    world
    other---->开始
    other ----> 结束
    执行结束

    同样, await 结束后才执行下一个

    3.task

    ​ 在事件循环中并发的添加多个任务的。可以说解决了上面的 await 的问题。

    1. 使用asyncio.create_task()  创建 3.7 之后, 建议
    2. loop.create_task() 或者 asyncio.ensure_future() 来创建
    
    async def run():
        print("hello")
        await asyncio.sleep(2)
        print("world")
        return 1
    
    async def run1():
        print("hello")
        await asyncio.sleep(5)
        print("world")
        
        
    tasks = [run(), run1()]
    done, pending = asyncio.run( asyncio.wait(tasks) )
    print("done:", done)
    print("pending:", pending)
    
    import asyncio
    
    async def run():
        print("hello")
        await asyncio.sleep(2)
        print("world")
        return 1
    
    async def run1():
        print("hello")
        await asyncio.sleep(5)
        print("world")
        
    # 主函数
    async def main():
        # 创建task, name 任务名称
        tasks = [ asyncio.create_task(run(), name="n1"), 
                 asyncio.create_task(run1(), name="n2")]
        # 或者
        # tasks = [
        #    run(),
        #    run1()
        # ]
        # done 已完成的
        # pending 未完成的
     	# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
        done, pending =  await asyncio.wait(tasks, timeout=None)
        return done, pending
    
    # 运行
    done, pending = asyncio.run( main() )
    print("done:", done)
    print("pending:", pending)
    

    4. asyncio.Future 对象

    他是 task 的基类,一个更低级的接口帮助我们等待task的结果

    Task 继承 Future, Task 对象内部 await 结果的处理基于Future 对象来的。

    示例1:

    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
        # 创建一个任务(Future对象), 这个任务什么都不干
        fut = loop.create_future()
        # 这个测试中这里始终等不到对象
        await fut
    asyncio.run(main())
    

    示例2:

    import asyncio
    
    async def set_after(fut):
        await asyncio.sleep(2)
        # 给fut 设置结果
        fut.set_resukt("666")
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
        # 创建一个任务(Future对象), 这个任务什么都不干
        fut = loop.create_future()
        await loop.create_task(set_after(fut))
        # 这里会等到set_after 为fut设置结果, 等待结束
        data = await fut
        print(data)
    asyncio.run(main())
    

    5. concurrent.futures.Future 对象

    使用线程池、进程池实现异步操作时用到的对象。

    帮助我们来获取结果

    https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

    示例1:

    import asyncio
    import time
    import concurrent.futures
    
    def blocking_io():
        # File operations (such as logging) can block the
        # event loop: run them in a thread pool.
        print("hello io")
        time.sleep(5) # 这时候这里可以是任何需要等待结果的操作
        print("world io")
        return "1111"
    
    def cpu_bound():
        # CPU-bound operations will block the event loop:
        # in general it is preferable to run them in a
        # process pool.
        print("hello cpu")
        time.sleep(5)
        print("world cpu")
        return "2222"
    
    async def f(fun):
        # 重点, 以线程池或进程池的方式启动
        loop = asyncio.get_event_loop()
        # run_in_executor 的方式启动,则可变成可等待对象。
        result =  loop.run_in_executor(
            None, fun)
        # 等待结果
        response = await result
        print(response)
    
    async def main():
        task = [ f(blocking_io), f(cpu_bound)]
        await asyncio.wait(task)
    
    asyncio.run( main())
    
    
    # 注
    # get_event_loop  只会在主线程帮您创建新的 event loop
    # get_running_loop 用于获取当前正在运行的loop,如果当前主线程中没有正在运行的loop,如果没有就会报RuntimeError 错误。
    

    本来这种方式是不支持 async, 目前的实现结合了async 实际上是通过 线程池来实现的

    loop.run_in_executor( None, fun)

    第一个参数传None 使用默认, 在上边连接官网中有介绍

    同时也可以把 asyncio.run(main()) 改成

    task = [ f(blocking_io), f(cpu_bound)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(task))
    

    6.异步上下文管理器

    此种对象通过定义 __aenter__()__aexit__()方法来对 async_with 语句中的环境进行控制。

    import asyncio
    
    class AsyncContentManager:
        def __init__(self):
            pass
        
        async def do_something(self):
            return 666
        
        async def __aenter__(self):
            # 连接数据库   
            return self
        
        async def __aexit__(self, exc_type, exc, tb):
            # 异步关闭数据库
            await asyncio.sleep(1)
    
    async def func():
        async with AsyncContentManager() as f:
            result = await f.do_something()
    asyncio.run( func() )
    

    7. uvloop

    ​ 是asyncio的事件循环的替代方案 > 默认asyncio的事件循环。

    pip install uvloop	
    
    import asyncio
    import uvloop
    
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    # 编写asyncio的代码,与之前写的代码一致
    # 内部的事件循环自动化会变成uvloop
    asyncio.run()
    
    

    8. 实操

    ​ 8.1 异步redis

    ​ 在python 操作redis, 连接/操作/断开都是网络IO

    pip install aioredis
    

    示例1:

    import asyncio
    import aioredis
    
    async def execute(address, passwd, test):
        """
        	@address: redis 连接地址
        	@passwd: redis 密码
        """
        print("开始执行", test)
        
        redis = await aioredis.create_redis_pool(address=address, password=passwd)
        # 网络io操作
        await redis.hmset_dict("car", key1=1, key2=2, key3=3)
        
        # 网络io操作、遇到io自动切换任务
        result = await redis.hgetall("car", encoding="utf-8")
        print(result)
        
        redis.close()
        #  网络io操作, 遇到io会自动切换任务
        await redis.wait_closed()
        print("结束", test)
    
    tasks = [
        execute("redis://127.0.0.1:6379", "295213", "测试1"),
        execute("redis://127.0.0.1:6379", "295213", "测试2")
    ]
    
    asyncio.run(asyncio.wait(tasks))
    

    结果

    开始执行 测试2
    开始执行 测试1
    {'key1': '1', 'key2': '2', 'key3': '3'}
    {'key1': '1', 'key2': '2', 'key3': '3'}
    结束 测试2
    结束 测试1
    
    

    8.2 异步mysql

    pip install aiomysql
    

    示例1:

    import asyncio
    import aiomysql
    
    async def execute(host, password, test):
        print("开始", test)
        # 建立连接
        conn = await aiomysql.connect(host=host, port=3306, user="root", password=password, db="work")
        # 创建指针
        cur = await conn.cursor()
        
        # 查询
        await cur.execute("select * from users limit 1")
        data = await cur.fetchall()
        print(data)
        await cur.close()
        conn.close()
        print("结束", test)
        
    tasks = [
        execute("127.0.0.1", "295213", "测试1"),
        execute("xxx.xx.xx.xx", "295213", "测试2")
    ]
    
    asyncio.run(asyncio.wait(tasks))
    
  • 相关阅读:
    129 01 Android 零基础入门 02 Java面向对象 06 Java单例模式 03 饿汉模式 VS 懒汉模式 02 懒汉式的代码实现
    128 01 Android 零基础入门 02 Java面向对象 06 Java单例模式 03 饿汉模式 VS 懒汉模式 01 饿汉式的代码实现
    127 01 Android 零基础入门 02 Java面向对象 06 Java单例模式 02 单例模式概述 01 单例模式的定义和作用
    126 01 Android 零基础入门 02 Java面向对象 06 Java单例模式 01 设计模式概述 01 设计模式简介
    125 01 Android 零基础入门 02 Java面向对象 05 Java继承(下)05 Java继承(下)总结 01 Java继承(下)知识点总结
    leetcode-----121. 买卖股票的最佳时机
    leetcode-----104. 二叉树的最大深度
    Json串的字段如果和类中字段不一致,如何映射、转换?
    Mybatis-Plus的Service方法使用 之 泛型方法default <V> List<V> listObjs(Function<? super Object, V> mapper)
    模糊查询
  • 原文地址:https://www.cnblogs.com/ShanCe/p/14550146.html
Copyright © 2011-2022 走看看