zoukankan      html  css  js  c++  java
  • asyncio:异步I/O、事件循环和并发工具(持续跟新中)

    流畅的Python书中的协程部分版本太低,而且讲的比较少,这次根据Python3标准库书中实例来学习记录asyncio的使用。

    asyncio模块提供了使用次饿成构建并发应用的工具。threading模块通过应用线程并发,mutilprocessing使用系统进程实现并发,asyncio则使用一个单线程单进程的方法来实现并发,应用的各个部分会彼此合作,在最优的时刻显式地切换任务。

    asyncio提供的框架以一个事件循环(event loop)为中心,这是一个首类对象,负责高效地处理I/O事件、系统事件、和应用的上下文切换。

    await 都可以直接激活运行协程或future、task

    但future与task里买包含了更多的属性,有

    'add_done_callback', 'all_tasks', 'cancel', 'cancelled',

    'current_task', 'done', 'exception', 'get_loop', 'get_stack',

    'print_stack', 'remove_done_callback', 'result', 'set_exception', 'set_result'

    所以future与task的功能更加加强,可以使用ensure_future或者loop.create_task将协程装饰成task

    task时future的子集,也可以直接通过asyncio.Future创建future

    就现在本人的了解,一般的io应用中,还是以多线程使用为主,自己在写协程并发的时候,多个协程之间,无法有效的设置条件交出控制权。

    唯一能应用的包,就一个aiohttp,如果我想用另外的包实现协程,基本无法做到,然而协程asyncio包就像流畅的Python书中所说,大部分在讲概念和API,

    由于协程的让出控制权让出,能适配的包很多,所以一般在示例中只能通过asyncio.sleep来模拟单携程让出控制权

    也只能希望后面能有更加丰富的包来配合协程,或者等我哪一天成为高手,写出能配合协程的包。

    2、利用协程合作完成多任务

    import asyncio
    
    
    async def coroutine():
        print('in coroutine')
    
    # 定义事件循环
    event_loop = asyncio.get_event_loop()
    
    try:
        print('starting coroutine')
        coro = coroutine()
        print('entering event loop')
        # 运行协程
        event_loop.run_until_complete(coro)
    finally:
        print('closing event loop')
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine.py
    starting coroutine
    entering event loop
    in coroutine
    closing event loop
    
    Process finished with exit code 0
    

     先通过get_event_loop创建一个默认的事件循环,run_until_complete方法驱动里面的协程,最后关闭事件循环。

    从协程返回值

    import asyncio
    
    
    # 有返回值
    async def coroutine():
        print('in coroutine')
        return 'result'
    
    # 定义事件循环
    event_loop = asyncio.get_event_loop()
    
    try:
        print('starting coroutine')
        coro = coroutine()
        print('entering event loop')
        # 运行协程,获取值
        return_value = event_loop.run_until_complete(coro)
        print(f'it returned: {return_value!r}')
    finally:
        print('closing event loop')
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine_return.py
    starting coroutine
    entering event loop
    in coroutine
    it returned: 'result'
    closing event loop
    
    Process finished with exit code 0
    

    串链协程

    import asyncio
    
    
    async def outer():
        print('in outer')
        print('waiting for result1')
        # 接收phase1()协程产生的值
        result1 = await phase1()
        print('waiting for result2')
        # 接收phase2()协程产生的值
        result2 = await phase2(result1)
        return result1, result2
    
    
    async def phase1():
        print('in parse1')
        return 'result1'
    
    async def phase2(arg):
        print('in phase2')
        return 'result2 derived from {}'.format(arg)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        return_value = event_loop.run_until_complete(outer())
        print(f'return_value:{return_value!r}')
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine_chain.py
    in outer
    waiting for result1
    in parse1
    waiting for result2
    in phase2
    return_value:('result1', 'result2 derived from result1')
    
    Process finished with exit code 0
    

    生成器而不是协程

    Python3.5开始

    async 代替了@asyncio.coroutine

    await 代替了 yield from

    本人觉得await 还是yield from更加直观。

    3、调度常规函数调用

    这个使用的话,需要把事件循环传递放入协程中,协程中通过事件循环的方法去激活需要调用的函数。

    在传入事件循环的协程里面必须设置asyncio.seleep,要不然协程不会让出控制权,事件循环根本无法激活调用函数。

    import asyncio
    from functools import partial
    
    
    def callback(arg, *, kwarg='default'):
        print(f'callback invoked with {arg} and {kwarg}')
    
    
    async def main(loop):
        print('registering callbacks')
        # 调用函数,只能传入一个参数,多参数传入调用partial
        loop.call_soon(callback, 1)
        # 通过partial传入关键字参数
        wrapped = partial(callback, kwarg='not default')
        loop.call_soon(wrapped, 2)
    
        await asyncio.sleep(.1)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        print('entering event loop')
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_call_soon.py
    entering event loop
    registering callbacks
    callback invoked with 1 and default
    callback invoked with 2 and not default
    
    Process finished with exit code 0
    

    用Delay调度回调

    call_soon是直接调用,call_later()第一个参数可以传入延后的事件,单位为秒,第二个参数为function

    在传入事件循环的协程里面必须设置asyncio.seleep,且大于最迟的调用函数时间,要不然协程不会让出控制权,事件循环根本无法激活调用函数。

    import asyncio
    from functools import partial
    
    
    def callback(arg):
        print(f'callback invoked {arg}')
    
    
    async def main(loop):
        print('registering callbacks')
        # 调用函数,第一个参数传入时间
        loop.call_later(0.2, callback, 1)
        loop.call_later(0.1, callback, 2)
        # 这个很重要
        await asyncio.sleep(.21)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        print('entering event loop')
        event_loop.run_until_complete(main(event_loop))
    finally:
        print('closing event loop')
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_call_later.py
    entering event loop
    registering callbacks
    callback invoked 2
    callback invoked 1
    closing event loop
    
    Process finished with exit code 0
    

    在指定事件内调度一个回调

    实现这个目的的循环依赖的是一个所谓的单调时钟,用loop.time()生成,激活用call_at

    import asyncio
    import time
    
    
    def callback(n, loop):
        print(f'callback {n} invoked at {loop.time()}')
    
    
    async def main(loop):
        # 运行结果来看,这now是从0开始的
        now = loop.time()
        print(f'clock time: {time.time()}')
        print(f'loop  time {now}')
        print('registering callbacks')
        # 加0.2秒
        loop.call_at(now + .2, callback, 1, loop)
        # 加0.1 秒
        loop.call_at(now + .1, callback, 2, loop)
        # 马上开始
        loop.call_soon(callback, 3, loop)
    
        await asyncio.sleep(1)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        print('entering event loop')
        event_loop.run_until_complete(main(event_loop))
    finally:
        print('closing event loop')
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asynvio_call_at.py
    entering event loop
    clock time: 1579366496.336677
    loop  time 0.086557153
    registering callbacks
    callback 3 invoked at 0.086701756
    callback 2 invoked at 0.189241196
    callback 1 invoked at 0.29037571
    closing event loop
    
    Process finished with exit code 0
    

    4、异步地生成结果。

    Future表示还未完成的工作结果。事件循环可以通过监视一个Future对象的状态来指示它已经完成,从而允许应用的一部分等待另一部分完成一些工作。

    从我的理解与操作来看,如果future获取函数中的值,需要通过event_loop.calll_soon调用函数,函数中传入future,通过future.set_result方法保存参数。

    import asyncio
    
    
    def mark_done(future, result):
        print('setting future result to {!r}'.format(result))
        future.set_result(result)
    
    event_loop = asyncio.get_event_loop()
    try:
        # 创建future对象
        all_done = asyncio.Future()
        print('scheduling mark_done')
        # 调度函数,函数中传入future
        event_loop.call_soon(mark_done, all_done, 'the result')
    
        print('entering event_loop')
        # 驱动future对象,并获取值。
        result = event_loop.run_until_complete(all_done)
        print('returned result: {!r}'.format(result))
    finally:
        print('closing event_loop')
        event_loop.close()
    print('future result: {!r}'.format(all_done.result()))
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_event_loop.py
    scheduling mark_done
    entering event_loop
    setting future result to 'the result'
    returned result: 'the result'
    closing event_loop
    future result: 'the result'
    
    Process finished with exit code 0
    

     一般我们获取future的值可以通过await

    实际感受,这种写法更加让人理解。

    import asyncio
    
    
    def mark_down(future, result):
        print('setting future result to {!r}'.format(result))
        future.set_result(result)
    
    async def main(loop):
        # 这种逻辑比较清楚,创建协程,协程里面创建future,带入函数中。
        all_done = asyncio.Future()
        print('scheduling mark done')
        loop.call_soon(mark_down, all_done, 'the result')
        # 通过await获取future的值
        result = await all_done
        print('returned result: {!r}'.format(result))
        return result
    
    
    event_loop = asyncio.get_event_loop()
    try:
        # 驱动协程,并获取协程的返回值。
        result = event_loop.run_until_complete(main(event_loop))
        print(f'last result is {result!r}')
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_await.py
    scheduling mark done
    setting future result to 'the result'
    returned result: 'the result'
    last result is 'the result'
    
    Process finished with exit code 0
    

    Future回调

    这个更像是通过future对象传递参数给函数,好比生成器里面的send

    很多时候,把await看成yiled from 更加能够让我理解。

    import asyncio
    import functools
    
    
    def callback(future, n):
        print('{}: future done: {}'.format(n, future.result()))
    
    
    async def register_callbacks(all_done):
        print('register callbacks on future')
        all_done.add_done_callback(functools.partial(callback, n=1))
        all_done.add_done_callback(functools.partial(callback, n=2))
    
    async def main(all_done):
        # 预激回调协程
        await register_callbacks(all_done)
        print('setting result of future')
        # 传入参数
        all_done.set_result('the result')
    
    event_loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()
        # 启动协程
        event_loop.run_until_complete(main(all_done))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_callback.py
    register callbacks on future
    setting result of future
    1: future done: the result
    2: future done: the result
    
    Process finished with exit code 0
    

    5、并发的执行任务

    任务的启动可以直接通过await来启动,或者在多个任务之间,有任务await asyncio.sleep,那其他任务马上就可以拿到控制权。

    任务是与事件循环交互的主要途径之一。任务可以包装成协程,并跟踪协程何时完成。由于任务是Future的子类,所以其他协程可以等待任务,而且每个任务可以有一个结果,在它完成之后可以获取这个结果

    启动一个任务

    要启动一个任务,可以使用时间循环.create_task()创建一个Task实例。

    import asyncio
    
    
    async def task_func():
        print('in task_func')
        return 'the result'
    
    async def main(loop):
        print('creating task')
        # 创建一个任务
        task = loop.create_task(task_func())
        # task = [loop.create_task(task_func()) for i in range(1000)]
        print('waiting for {!r}'.format(task))
        # 直接运行任务,并取回返回值,一般用await取回返回值
        return_value = await task
        print('task_completed {!r}'.format(task))
        print('return value: {!r}'.format(return_value))
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_create_task.py
    creating task
    waiting for <Task pending coro=<task_func() running at /Users/shijianzhong/study/t_asyncio/asyncio_create_task.py:4>>
    in task_func
    task_completed <Task finished coro=<task_func() done, defined at /Users/shijianzhong/study/t_asyncio/asyncio_create_task.py:4> result='the result'>
    return value: 'the result'
    
    Process finished with exit code 0
    

     从上面可以看出,task运行与没运行的状态是不一样的。

    取消一个任务

    import asyncio
    
    
    async def task_func():
        print('in task_func')
        return 'the result'
    
    async def main(loop):
        print('creating task')
        task = loop.create_task(task_func())
        # 取消任务
        print('canceling task')
        task.cancel()
    
        print('canceled task {!r}'.format(task))
        try:
            await task
        # 获取错误
        except asyncio.CancelledError:
            print('caught error from canceled task')
        else:
            print('task result: {!r}'.format(task.result()))
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task.py
    creating task
    canceling task
    canceled task <Task cancelling coro=<task_func() running at /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task.py:4>>
    caught error from canceled task
    
    Process finished with exit code 0
    

    并发取消一个正在执行的任务。

    import asyncio
    
    
    async def task_func():
        print('in task_func, sleeping')
        try:
            # 让出控制权,执行loop.call_soon
            await asyncio.sleep(1)
        except asyncio.CancelledError:
            print('task_func was canceled')
            raise
        return 'the result'
    
    
    def task_canceller(t):
        print('in task_canceller')
        t.cancel()
        # print(t)
        print('canceled the task')
    
    
    async def main(loop):
        print('create task')
        task = loop.create_task(task_func())
        # 在协程中并不是第一时间执行,需要等待控制权。
        loop.call_soon(task_canceller, task)
        print(f'task status is: {task} ')
        try:
            # 执行 task,让出控制权,给call_soon
            res = await task
            print(res,'is here')
        except asyncio.CancelledError:
            print('main() also sees task as canceled')
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task2.py
    create task
    task status is: <Task pending coro=<task_func() running at /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task2.py:4>> 
    in task_func, sleeping
    in task_canceller
    canceled the task
    task_func was canceled
    main() also sees task as canceled
    
    Process finished with exit code 0
    

    从协程创建任务。

    import asyncio
    
    # await不仅可以启动协程,还可以启动任务。
    
    async def wrapper():
        print('wrapped')
        return 'result'
    
    
    async def inner(task):
        print('inner: starting')
        print('inner:waiting for {!r}'.format(task))
        # 启动task任务
        result = await task
        print('inner: task returned {!r}'.format(result))
        return result
    
    
    async def starter():
        print('starter: creating task')
        # 创建任务
        task = asyncio.ensure_future(wrapper())
        print('starter:waiting for inner')
        # 启动inner协程
        result = await inner(task)
        print('starter: inner returned')
        return result
    event_loop = asyncio.get_event_loop()
    try:
        print('entering event loop')
        result = event_loop.run_until_complete(starter())
        print('last res is {}'.format(result))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_ensure_future.py
    entering event loop
    starter: creating task
    starter:waiting for inner
    inner: starting
    inner:waiting for <Task pending coro=<wrapper() running at /Users/shijianzhong/study/t_asyncio/asyncio_ensure_future.py:5>>
    wrapped
    inner: task returned 'result'
    starter: inner returned
    last res is result
    
    Process finished with exit code 0
    

    6、组合协程和控制结构

    一系列协程之间的线性控制流用内置关键字await可以很容易的管理。更复制的结构可能允许一个协程等待多个其他协程并行完成,可以使用asyncio中的工具创建这些更复杂的结构。

    等待多个携程

    import asyncio
    
    
    async def phase(i):
        print('in phase {}.'.format(i))
        await asyncio.sleep(0.1 * i)
        print('done with phase {}'.format(i))
        return 'phase {} result'.format(i)
    
    async def main(num_phase):
        print('starting main')
        # 携程队列
        phases = [
            phase(i)
            for i in range(num_phase)
        ]
        print('wait for phases to complete')
        # 接收携程队列,complete适完成的,pending是未完成的
        completed, pending = await asyncio.wait(phases)
        # 从完成的携程里面取值
        results = [t.result() for t in completed]
        print('results: {!r}'.format(results))
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(3))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_wait.py
    starting main
    wait for phases to complete
    in phase 1.
    in phase 2.
    in phase 0.
    done with phase 0
    done with phase 1
    done with phase 2
    results: ['phase 2 result', 'phase 0 result', 'phase 1 result']
    
    Process finished with exit code 0
    

    在wait内设定一个超时值,超过这个事件pending里面保留未完成的协程

    import asyncio
    
    
    async def phase(i):
        print('in phase {}'.format(i))
        try:
            await asyncio.sleep(0.1 * i)
        except asyncio.CancelledError:
            print('phase {} canceled'.format(i))
            raise
        else:
            print('done with phase {}'.format(i))
            return 'phase {} result'.format(i)
    
    
    async def main(num_phases):
        print('starting main')
        phases = [
            phase(i)
            for i in range(num_phases)
        ]
        print('waiting 0.1 for phase to complete')
        # 设定超时时间,为完成的携程进入pending
        completed, pending = await asyncio.wait(phases, timeout=0.1)
    
        print('{} completed and {} pending'.format(len(completed), len(pending)))
        # 把未完成的协程取消了
        if pending:
            print('canceling tasks')
        [t.cancel() for t in pending]
        print('exiting main')
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(3))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_wait_timeout.py
    starting main
    waiting 0.1 for phase to complete
    in phase 1
    in phase 2
    in phase 0
    done with phase 0
    1 completed and 2 pending
    canceling tasks
    exiting main
    phase 1 canceled
    phase 2 canceled
    
    Process finished with exit code 0
    

    从协程收集结果

    import asyncio
    
    
    async def phase1():
        print('in phase1')
        await asyncio.sleep(2)
        print('done with phase1')
        return 'phases1 result'
    
    
    async def phase2():
        print('in phase2')
        await asyncio.sleep(1)
        print('done with phase2')
        return 'phase2 result'
    
    async def main():
        print('starting main')
        print('waiting for phases to complete')
        # 这个直接收集返回值,按照传入的协程顺序排列
        result = await asyncio.gather(phase1(), phase2())
        print('results: {!r}'.format(result))
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main())
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/async_gather.py
    starting main
    waiting for phases to complete
    in phase1
    in phase2
    done with phase2
    done with phase1
    results: ['phases1 result', 'phase2 result']
    
    Process finished with exit code 0
    

    后台操作完成时进行处理。

    运用了as_completed方法,那个完成早返回哪个协程

    import asyncio
    
    async def phase(i):
        print('in phase {}'.format(i))
        await asyncio.sleep(.5 - (.1 * i))
        print('done with phase {}'.format(i))
        return 'phase {} result'.format(i)
    
    
    async  def main(num_phases):
        print('starting main')
        phases = [
            phase(i)
            for i in range(num_phases)
        ]
        print('waiting for phases to completed')
        results = []
        # 参数里面可以填写超时事件
        for next_to_complete in asyncio.as_completed(phases):
            # 协程没有result属性,future才有
            answer = await next_to_complete
            print('received answer {!r}'.format(answer))
            results.append(answer)
        print('results: {!r}'.format(results))
        return results
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(3))
    finally:
        event_loop.close()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/async_gather.py
    starting main
    waiting for phases to complete
    in phase1
    in phase2
    done with phase2
    done with phase1
    results: ['phases1 result', 'phase2 result']
    
    Process finished with exit code 0
    

    7、同步原语

    待续

  • 相关阅读:
    exchange 2013 versions
    Outlook Web App Customization
    Using the FullCalendar Plugin in SharePoint 2013 Apps
    erlide的调试设置
    MySQLProxy
    Flex Socket安全策略<policyfilerequest/>及应对方法
    一些flex教程
    CentOS apache 配置
    C++与Flex之间socket通信policyfilerequest解决方案
    免费使用Adobe Flash Builder 4.5方法
  • 原文地址:https://www.cnblogs.com/sidianok/p/12210857.html
Copyright © 2011-2022 走看看