zoukankan      html  css  js  c++  java
  • python随用随学20200220-异步IO

    啥是异步IO

    众所周知,CPU速度太快,磁盘,网络等IO跟不上. 而程序一旦遇到IO的时候,就需要等IO完成才能进行才能进行下一步的操作. 严重拖累了程序速度.

    因为一个IO操作就阻塞了当前线程,导致其他代码无法执行,所以我们必须使用多线程或者多进程来并发的执行代码.

    但是多线程或者多进程虽然解决了并发问题. 但是线程的增加,系统切换线程的开销也会变大. 如果线程太多,CPU的时间就花在了频繁切换线程上.(为啥会有开销,如果不懂的话,请看计算机专业本科教材,操作系统原理)

    所以除了多线程和多进程之外,还有一个办法就是异步IO. 也就是传说中的消息订阅机制.
    进程发出IO请求后就不管了,然后去干其他的事儿. 等IO返回消息之后再去处理.

    如果采用异步IO的话,我们平常的这种顺序执行的代码就不好使了,需要有一个一直在监听事件的消息循环. 一般情况下,我们会使用一个无限循环来进行监听. 是的你没有看错,就是个死循环.

    在异步模式下,没有发生频繁的线程切换. 对于IO密集型的场景,异步IO非常合适.

    协程

    协程,又叫微线程,coroutine.函数在所有语言中都是层级调用,比如A中调用了B,B中又调用了C. 那么C执行完返回,B执行完返回,A执行完.结束.
    所以函数的调用都是通过栈来实现的.一个线程执行的就是一个函数.

    函数的调用都会有一个入口和一个返回. 所以函数调用的顺序都是明确的. 而协程的调用和函数的调用完全不同.

    协程可以在执行的时候中断,然后再在合适的时候返回来接着执行.

    协程从执行结果上来看,有点像多线程,但是其优点在于没有发生线程的切换,所以其效率是远高于多线程的. 而且不需要锁...

    python中的协程是通过generator来实现的. Python中的yeild不但可以返回一个值,还可以接收调用者发出的参数.

    1. def consumer(): 
    2. r = '' 
    3. while True: 
    4. n = yield r 
    5. if not n: 
    6. return 
    7. print('[CONSUMER] Consuming %s...' % n) 
    8. r = '200 ok' 
    9.  
    10. def produce(c): 
    11. c.send(None) 
    12. n = 0 
    13. while n < 5: 
    14. n = n + 1 
    15. print('[PRODUCER] Producing %s' % n) 
    16. r = c.send(n) 
    17. print('[PRODUCER] Consumer return %s' % r) 
    18. c.close() 
    19.  
    20. c = consumer() 
    21. produce(c) 
    22.  

    廖雪峰的代码,应该很好看懂. 不多做解释了.

    协程的使用

    asyncio是python3.4之后的版本引入的标准库,内置了对异步IO的支持.

    asyncio就是一个消息循环模型. 我们从asyncio模块中获取一个eventloop然后把协程丢到这个eventloop中,就可以实现异步IO

    1. import asyncio 
    2.  
    3. @asyncio.coroutine 
    4. def hello(): 
    5. print('hello world') 
    6. r = yield from asyncio.sleep(2) 
    7. print('hello world again!') 
    8.  
    9. loop = asyncio.get_event_loop() 
    10. loop.run_until_complete(hello()) 
    11. loop.close() 
    12.  
    1. import threading 
    2. import asyncio 
    3.  
    4. @asyncio.coroutine 
    5. def hello(): 
    6. print('hello world! (%s)' % threading.current_thread()) 
    7. yield from asyncio.sleep(2) 
    8. print('hello world again (%s)' % threading.current_thread()) 
    9.  
    10. loop = asyncio.get_event_loop() 
    11. task = [hello(), hello()] 
    12. loop.run_until_complete(asyncio.wait(task)) 
    13. loop.close() 
    14.  

    这里就可以看出来,两个hello()函数是由同一个线程并发执行的.

    coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
    Run awaitable objects in the aws set concurrently and block until the condition specified by return_when.
    If any awaitable in aws is a coroutine, it is automatically scheduled as a Task. Passing coroutines objects to wait() directly is deprecated as it leads to confusing behavior.
    这是里关于wait方法的定义. 说实话没搞太懂...

    async和await

    构造协程并使用asyncio的两种方法. 第一种就是上面说说的. 使用修饰符@asyncio.coroutine 并在协程内使用yield from

    另外在python3.5版本后新增了asyncawait关键字. 使用也很简单,用async替代修饰符@asyncio.coroutine 使用await替换yield from

    coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
    Run awaitable objects in the aws set concurrently and block until the condition specified by return_when.Returns two sets of Tasks/Futures: (done, pending).
    另外这个方法在python3.8之后被deprecated了.

    定义一个协程

    1. import asyncio 
    2. import time 
    3.  
    4. async def so_some_work(x): 
    5. print('waiting: ',x) 
    6.  
    7. now = lambda :time.time() 
    8.  
    9. start = now() 
    10.  
    11. coroutine = so_some_work(2) 
    12.  
    13. loop = asyncio.get_event_loop() 
    14. loop.run_until_complete(coroutine) 
    15.  
    16. print('TIME: ',now() - start) 
    17.  

    这是协程的定义方法,注意协程并不是可以直接执行的函数. 这里通过get_event_loop()方法获取一个一个事件循环,然后通过run_until_complete()方法讲协程注册到消息循环中去,并启动消息循环.

    创建一个task

    Tasks are used to schedule coroutines concurrently.
    When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon

    task是Future的子类,保存了协程运行后的状态,用于未来获取协程结果.

    协程是不能直接运行的,在使用方法run_until_complete()方法注册协程的时候,协程就自动被包装成了一个task对象.

    1. import asyncio 
    2. import time 
    3.  
    4. async def do_some_work(x): 
    5. print('Waiting: ', x) 
    6.  
    7. now = lambda: time.time() 
    8.  
    9. start = now() 
    10.  
    11. coroutine = do_some_work(2) 
    12. loop = asyncio.get_event_loop() 
    13. task = loop.create_task(coroutine) 
    14. print(task) 
    15.  
    16. loop.run_until_complete(task) 
    17. print(task) 
    18. print('TINE: ', now() - start) 
    19.  

    比如这个例子就创建了一个task.


    pic-1582726248201.png

    从这个结果来看,task在加入时间循环之前是pending的状态. 而运行完之后 是done的状态,而且还有有个result参数.

    绑定回调

    绑定回调,在task执行完毕之后可以获取执行的结果,也就是协程得返回值. 回调的最后一个参数是future对象,如果回调需要多个参数,也可以通过偏函数导入.

    单个参数的callback

    1. import asyncio 
    2. import time 
    3.  
    4. async def so_some_work(x): 
    5. print('Waitting: ', x) 
    6. return 'Done after {0}s'.format(x) 
    7.  
    8. def callback(future): 
    9. print('Callback: ', future.result()) 
    10.  
    11. now = lambda: time.time() 
    12.  
    13. start = now() 
    14.  
    15. coroutine = so_some_work(2) 
    16. loop = asyncio.get_event_loop() 
    17. task = asyncio.ensure_future(coroutine) 
    18. task.add_done_callback(callback) 
    19. loop.run_until_complete(task) 
    20.  
    21. print('TIME: ', now() - start) 
    22.  

    ensure_future()create_task()都可以创建一个task. 不过create_task()是python3.7之后新增的方法.

    有多个参数的callback

    1. def callback2(t, future): 
    2. print('Callback: ', t, future.result()) 
    3.  
    4. -------------------------------------------------- 
    5. task.add_done_callback(functools.partial(callback2, 2)) 
    6.  

    回调让程序逻辑变的更加复杂(就是容易写着写着就懵逼了). 其实在task执行完毕之后,可以直接使用result()方法获取其返回值. 这样就不需要回调了.

    1. import asyncio 
    2. import time 
    3.  
    4. now = lambda :time.time() 
    5.  
    6. async def do_some_work(x): 
    7. print('Waitting: ',x) 
    8. return 'Done after {}s'.format(x) 
    9.  
    10. start = now() 
    11.  
    12. coroutine = do_some_work(2) 
    13. loop = asyncio.get_event_loop() 
    14. task = asyncio.ensure_future(coroutine) 
    15. loop.run_until_complete(task) 
    16.  
    17. print('Task ret: {}'.format(task.result())) 
    18. print('TIME: {}'.format(now()-start)) 
    19.  

    await和阻塞

    async可以用来定义协程对象, await可以针对耗时的操作进行挂起,就像yield一样可以把协程暂时挂起,让出控制权.

    协程遇到await,时间循环会把当前协程暂时挂起,执行别的协程,知道其他的协程也挂起或者执行完毕,再进行下一个协程的执行

    针对一些耗时的IO操作一般会是用await的方式挂起它.

    1. import asyncio 
    2. import time 
    3.  
    4. now = lambda :time.time() 
    5.  
    6. async def do_some_work(x): 
    7. print('Waitting: ',x) 
    8. await asyncio.sleep(x) 
    9. return 'Done after {}s'.format(x) 
    10.  
    11. start = now() 
    12.  
    13. coroutine = do_some_work(2) 
    14. loop = asyncio.get_event_loop() 
    15. task = asyncio.ensure_future(coroutine) 
    16. loop.run_until_complete(task) 
    17.  
    18. print('Task ret: {}'.format(task.result())) 
    19. print('TIME: {}'.format(now()-start)) 
    20.  

    一个协程看不出啥区别. 多整几个来看看

    1. import asyncio 
    2. import time 
    3.  
    4. now = lambda :time.time() 
    5.  
    6. async def do_some_work(x): 
    7. print('Waitting: ',x) 
    8. await asyncio.sleep(x) 
    9. return 'Done after {}s'.format(x) 
    10.  
    11. start = now() 
    12.  
    13. coroutine1 = do_some_work(1) 
    14. coroutine2 = do_some_work(2) 
    15. coroutine3 = do_some_work(4) 
    16. loop = asyncio.get_event_loop() 
    17. tasks=[ 
    18. asyncio.ensure_future(coroutine1), 
    19. asyncio.ensure_future(coroutine2), 
    20. asyncio.ensure_future(coroutine3) 
    21. ] 
    22.  
    23. # loop.run_until_complete(asyncio.wait(tasks)) 
    24. loop.run_until_complete(asyncio.wait(tasks)) 
    25.  
    26. for task in tasks: 
    27. print('Task ret: ',task.result()) 
    28. print('TIME: {}'.format(now()-start)) 
    29.  

    其实我想说到这里,我才真正理解wait方法是干啥的...执行一个task或者协程的集合...

    这里还要多说一点就是并行和并发的区别.
    并行的意思是同一时刻有多个任务在执行.
    并发的意思是有多个任务要同时进行.
    按照我的理解,并发就是TDD,并行就是FDD

    协程的嵌套(这特么是有病吧...)

    不是有病...是为了实现更多的IO操作过程,也就是一个协程中await了另外一个协程

    1. import asyncio 
    2. import time 
    3.  
    4. now = lambda: time.time() 
    5.  
    6. async def do_some_work(x): 
    7. print('Waitting: ', x) 
    8. await asyncio.sleep(x) 
    9. return 'Done after {}s'.format(x) 
    10.  
    11. async def foo(): 
    12. coroutine1 = do_some_work(1) 
    13. coroutine2 = do_some_work(2) 
    14. coroutine3 = do_some_work(4) 
    15.  
    16. tasks = [ 
    17. asyncio.ensure_future(coroutine1), 
    18. asyncio.ensure_future(coroutine2), 
    19. asyncio.ensure_future(coroutine3) 
    20. ] 
    21. dones, pendings = await asyncio.wait(tasks) 
    22.  
    23. for task in dones: 
    24. print('Task ret: ', task.result()) 
    25.  
    26. start = now() 
    27. loop = asyncio.get_event_loop() 
    28. loop.run_until_complete(foo()) 
    29.  
    30. print('TIME: ', now() - start) 
    31.  

    协程的停止

    future有几个状态:

    • Pending: task刚创建的时候
    • Running: 时间循环调用执行的时候
    • Done: 执行完成的时候
    • Cancelled: 被cancel()之后
    1. import asyncio 
    2. import time 
    3.  
    4. now = lambda :time.time() 
    5.  
    6. async def do_some_work(x): 
    7. print('Waitting: ',x) 
    8. await asyncio.sleep(x) 
    9. print('Done after {}s'.format(x)) 
    10.  
    11. corutine1 = do_some_work(1) 
    12. corutine2 = do_some_work(2) 
    13. corutine3 = do_some_work(4) 
    14.  
    15. tasks = [ 
    16. asyncio.ensure_future(corutine1), 
    17. asyncio.ensure_future(corutine2), 
    18. asyncio.ensure_future(corutine3) 
    19. ] 
    20.  
    21. start = now() 
    22.  
    23. loop = asyncio.get_event_loop() 
    24.  
    25. try: 
    26. loop.run_until_complete(asyncio.wait(tasks)) 
    27. except KeyboardInterrupt as e: 
    28. print(asyncio.Task.all_tasks()) 
    29. for task in asyncio.Task.all_tasks(): 
    30. print(task.cancel()) 
    31. loop.stop() 
    32. loop.run_forever() 
    33. finally: 
    34. loop.close() 
    35.  
    36. print('TIME: ', now()-start) 
    37.  

    20200222-实在写不下去了,以后再补吧...


    参考资料:

    廖雪峰的网站

    python黑魔法-简书

  • 相关阅读:
    TensorFlow------学习篇
    汉语分词工具的研发-----
    SVM强化学习
    RNN和LSTM系统强化学习———
    CRF学习笔记
    Word2vector---------学习笔记
    FindAllAnagramsinaString
    Java中比较两个字符串是否相等的问题
    15003_特殊数字
    雷林鹏分享:Lua break 语句
  • 原文地址:https://www.cnblogs.com/thecatcher/p/12369468.html
Copyright © 2011-2022 走看看