zoukankan      html  css  js  c++  java
  • python并发编程之asyncio协程(三)

    协程实现了在单线程下的并发,每个协程共享线程的几乎所有的资源,除了协程自己私有的上下文栈;协程的切换属于程序级别的切换,对于操作系统来说是无感知的,因此切换速度更快、开销更小、效率更高,在有多IO操作的业务中能极大提高效率。

    系列文章

    asyncio模块创建协程

    asyncio在python3.4后被内置在python中,使得python的协程创建变得更加方便。

    import asyncio
    import os
    
    # async 关键字定义一个协程
    async def target_func1():
        print('the func start')
        print(os.getpid())
        print('the func end')
    
    def run():
        # 创建一个协程对象
        coroutine = target_func1()
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
        loop.run_until_complete(coroutine) # 将协程对象添加到事件循环,运行直到结束
        print(os.getpid())
        loop.close() # 关闭事件循环
    
    def run1():
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
        # 创建一个协程对象
        coroutine = target_func1(loop)
        loop.create_task(coroutine) # 创建一个任务并添加到事件循环中
        loop.run_forever()  # 开启无限循环,需要在异步函数中调用stop()使停止
        loop.close()
    
    if __name__ == '__main__':
        run()
    
    # 结果
    the func start
    4876
    the func end
    4876
    

    以上可知,所有的代码段都是在一个进程的单线程中执行。

    asyncio模块分析

    • coroutine

    被async修饰的函数调用后会生成协程函数,可以通过send唤醒执行。

    async def target_func1():
        print('the func start')
        print(os.getpid())
        print('the func end')
    coroutine = target_func1()
    
    try:
        coroutine.send(None) # 唤醒协程
    except StopIteration:
        print('xx')
        coroutine.close() # 关闭
    
    • async

    async关键字可以定义一个协程对象,被async修饰的函数变成了一个协程对象而不是一个普通的函数。

    async def target_func1():
        pass
    
    coroutine = target_func1()
    print(coroutine)
    
    • await

    await用于控制事件的执行顺序,它只能在异步函数中使用,即被async关键字定义的协程函数,否则报错。当执行到await时,当前协程挂起,转而去执行await后面的协程,完毕后再回到当前协程继续往下。

    # async 关键字定义一个协程
    async def target_func1():
        print('the func start')
        x = await target_func2() # 当前协程挂起
        print(x)
        print('the func end')
        return 1
    
    async def target_func2():
        """
        目标函数2
        :return:
        """
        time.sleep(2)
        print('the func end2')
        return 0
    
    • 主要方法
    asyncio.get_event_loop():创建一个事件循环,所有的异步函数都需要在事件循环中运行;
    asyncio.ensure_future():创建一个任务
    asyncio.gather(*fs):添加并行任务
    asyncio.wait(fs):添加并行任务,可以是列表
    loop.run_until_complete(func):添加协程函数同时启动阻塞直到结束
    loop.run_forever():运行事件无限循环,直到stop被调用
    loop.create_task():创建一个任务并添加到循环
    loop.close():关闭循环
    loop.time():循环开始后到当下的时间
    loop.stop():停止循环
    loop.is_closed() # 判断循环是否关闭
    loop.create_future():创建一个future对象,推荐使用这个函数而不要直接创建future实例
    loop.call_soon() # 设置回调函数,不能接受返回的参数,需要用到future对象,立即回调
    loop.call_soon_threadsafe() # 线程安全的对象
    loop.call_later() # 异步返回后开始算起,延迟回调
    loop.call_at() # 循环开始多少s回调
    loop.call_exception_handler() # 错误处理
    
    • 主要的类

    Future:主要用来保存任务的状态;
    Task:Future的子类,扩展了Future的功能;

    # Future
    from asyncio import Future
    # future = Future()
    # future.result() # 获取任务的结果
    # future.remove_done_callback(fn) # 删除所有的回调函数并返回个数
    # future.set_result('result') # 设置任务的结果,必须在result()之前执行,否则报错
    # future.exception() # 获取任务的错误信息
    # future.set_exception('bad') # 设置任务的错误信息
    # future.add_done_callback('fn') # 添加回调函数
    
    # Task
    current_task():返回循环当前的任务,类方法
    all_tasks():返回事件循环所有的任务
    get_stack():获取其他协程的堆栈列表
    print_stack:输出其他协程的堆栈列表
    cancel:取消任务
    

    实例

    • 添加多个任务到事件循环
    async def target_func3(name):
        """
        :return:
        """
        await asyncio.sleep(1)
        print(name)
        return 0
    
    def run1():
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
        x = loop.run_until_complete(asyncio.gather(target_func3('A'),target_func3('B'),target_func3('C'),))
        print(x) # 等待返回结果,一个列表,按照事件添加的顺序,但是计算的顺序是不定的
        loop.close()
    
    if __name__ == '__main__':
        run1()
    
    • 使用run_forever启动循环获取异步计算结果

    run_forever()不能直接得到异步函数的返回结果,需要使用Future类来作为第三方保存结果,同时设置回调函数;

    from asyncio import Future
    from functools import partial
    
    async def target_func0(name, future):
        """
        目标函数2
        :return:
        """
        time.sleep(1)
        print(name)
        future.set_result(name) # 设置返回结果
    
    def got_result(loop, future):
        print(future.result()) # 处理结果
        loop.stop() # 循环停止
    
    def run():
        loop = asyncio.get_event_loop()
        future = Future(loop=loop)
        res = asyncio.ensure_future(target_func0('A', future)) # 生成一个Task任务
        print(res)
        future.add_done_callback(partial(got_result, loop)) # 回调函数默认只能有一个参数future,必须使用偏函数
        # print(future.result()) # future上下文必须先调用future.set_result。
        loop.run_forever()
        loop.close()
    
    if __name__ == '__main__':
        run()
    
    • 链协程

    协程里调用等待另外的协程完成后才能返回。

    import asyncio
    import time
    # async 关键字定义一个协程
    async def target_func1():
        print('the func start')
        x = await target_func2() # 等待协程完成,控制执行顺序
        print(x)
        print('the func end')
        return 1
    
    async def target_func2():
        """
        目标函数2
        :return:
        """
        time.sleep(2)
        print('the func end2')
        return 0
    
    def run1():
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
        x = loop.run_until_complete(target_func1())
        print(x)
        loop.close()
    if __name__ == '__main__':
        run()
    
    • 普通回调实例
    import asyncio
    import time
    from functools import partial
    
    # async 关键字定义一个协程
    async def target_func1():
        print('the func end')
        return 1
    
    def get_res(loop):
        print('xxxx')
        loop.stop()
    
    def run1():
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
        loop.create_task(target_func1())
        # loop.call_soon(partial(get_res, loop)) # 设置回调函数,不能接受返回的参数,需要用到future对象
        # loop.call_soon_threadsafe() # 线程安全的对象
        # loop.call_later(delay=5, callback=partial(get_res, loop)) # 异步返回后开始算起,延迟5秒回调
        # loop.call_at(when=8000,callback=partial(get_res, loop)) # 循环开始第8秒回调
        # loop.call_exception_handler() # 错误处理
        loop.run_forever()
        loop.close()
    
    if __name__ == '__main__':
        run1()
    

    本地IO和网络IO的异步使用

    使用协程的目的是在系统发生io阻塞的时候,可以交出CUP的控制权,让其去执行其他的任务。实际使用时一般的场景有本地IO和网络IO。

    • 网络IO
    # 使用asyncio+aiohttp,如果想异步化,网络请求需要抛弃requests包
    import asyncio
    import time
    from aiohttp import ClientSession
    
    async def target2():
        print('start2')
        async with ClientSession() as session:
            async with session.get(url='http://www.baidu.com') as rsp:
                data = await rsp.read()
        print('end2')
        return data
    
    def run1():
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
        tasks = [target2() for i in range(100)]
        ts = asyncio.gather(*tasks)
        t = time.time()
        loop.run_until_complete(ts)
        print(time.time()-t)
        loop.close()
    
    if __name__ == '__main__':
        run1()
    
    • 本地io

    核心思想:将文件读写的while循环换成事件循环。

    可参考:https://github.com/lyyyuna/script_collection/blob/master/aysncfile/asyncfile.py

    协程Queue

    asyncio模块也有自己的queue实现生产消费模式,只要有三种队列:Queue(先进先出),PriorityQueue(优先级队列),LifoQueue(栈),但是Queue不是线程安全的类,也就是说在多进程或多线程的情况下不要使用这个队列。

    import asyncio
    import time
    from asyncio import Queue
    
    # async 关键字定义一个协程
    async def target_func1(q:Queue):
        for i in range(100):
            await q.put(i)
    
    async def target_func2(q:Queue):
        for i in range(100):
            x = await q.get()
            print(x)
    
    def run1():
        # 创建一个事件循环
        loop = asyncio.get_event_loop()
        q = Queue(100)
        task = asyncio.gather(target_func1(q), target_func2(q))
        loop.run_until_complete(task)
        loop.close()
    
    if __name__ == '__main__':
        run1()
    

    Queue的get(),join(),put()方法返回的都是协程,需要使用await关键字。

  • 相关阅读:
    一个简单的jsp自定义标签
    js正则表达式学习
    java获取当前日期等以及时区
    java日期处理SimpleDateFormat等
    一个炫酷的导航菜单,模仿别人写的
    后台管理界面自己写,模仿,更新中...
    信息收集-主机综合扫描工具的使用
    ms10_046_shortcut_icon_dllloader漏洞利用和ettercap dns欺骗
    如何成为一名黑客
    msf常用命令
  • 原文地址:https://www.cnblogs.com/cwp-bg/p/9590700.html
Copyright © 2011-2022 走看看