zoukankan      html  css  js  c++  java
  • Python的异步编程[0] -> 协程[1] -> 使用协程建立自己的异步非阻塞模型

    使用协程建立自己的异步非阻塞模型


    接下来例子中,将使用纯粹的Python编码搭建一个异步模型,相当于自己构建的一个asyncio模块,这也许能对asyncio模块底层实现的理解有更大的帮助。主要参考为文末的链接,以及自己的补充理解。

    完整代码

      1 #!/usr/bin/python
      2 # =============================================================
      3 # File Name: async_base.py
      4 # Author: LI Ke
      5 # Created Time: 1/29/2018 09:18:50
      6 # =============================================================
      7 
      8 
      9 import types
     10 import time
     11 
     12 
     13 @types.coroutine
     14 def switch():
     15     print('Switch: Start')
     16     yield
     17     print('Switch: Done')
     18 
     19 async def coro_1():
     20     print('C1: Start')
     21     await switch()
     22     print('C1: Stop')
     23 
     24 
     25 async def coro_2():
     26     print('C2: Start')
     27     print('C2: 1')
     28     print('C2: 2')
     29     print('C2: 3')
     30     print('C2: Stop')
     31 
     32 c_1 = coro_1()
     33 c_2 = coro_2()
     34 
     35 try:
     36     c_1.send(None)
     37 except StopIteration:
     38     pass
     39 try:
     40     c_2.send(None)
     41 except StopIteration:
     42     pass
     43 try:
     44     c_1.send(None)
     45 except StopIteration:
     46     pass
     47 
     48 print('--------------------------------')
     49 
     50 def run(coros):
     51     coros = list(coros)
     52 
     53     while coros:
     54         # Duplicate list for iteration so we can remove from original list
     55         for coro in list(coros):
     56             try:
     57                 coro.send(None)
     58             except StopIteration:
     59                 coros.remove(coro)
     60 
     61 c_1 = coro_1()
     62 c_2 = coro_2()
     63 run([c_1, c_2])
     64 
     65 print('--------------------------------')
     66 
     67 @types.coroutine
     68 def action(t):
     69     trace=[]
     70     while True:
     71         trace.append(time.time())
     72         if trace[-1] - trace[0] > t:
     73             break # This break will end this function and raise a StopIteration
     74         yield
     75 
     76 async def coro_1():
     77     print('C1: Start')
     78     await action(2)
     79     print('C1: Stop')
     80 
     81 
     82 async def coro_2():
     83     print('C2: Start')
     84     await action(3)
     85     print('C2: Stop')
     86 
     87 def timeit(f):
     88     def _wrapper(*args, **kwargs):
     89         start = time.time()
     90         re = f(*args, **kwargs)
     91         end = time.time()
     92         print('Time cost:', f.__name__, end-start)
     93         return re
     94     return _wrapper
     95 
     96 c_1 = coro_1()
     97 c_2 = coro_2()
     98 timeit(run)([c_1])
     99 timeit(run)([c_2])
    100 
    101 print('--------------------------------')
    102 
    103 c_1 = coro_1()
    104 c_2 = coro_2()
    105 timeit(run)([c_1, c_2])
    View Code

    分段解释

    首先会导入需要的模块,这里仅仅使用types和time两个模块,放弃异步I/O的asyncio模块。

    1 import types
    2 import time

    接下来定义一个switch函数,利用types.coroutine装饰器将switch装饰成一个协程,这个协程将完成一个切换功能。

    1 @types.coroutine
    2 def switch():
    3     print('Switch: Start')
    4     yield
    5     print('Switch: Done')

    随后定义第一个协程,协程启动后,会进入一个await,即切入刚才的switch协程,这里使用async和await关键字完成对协程的定义。

    1 async def coro_1():
    2     print('C1: Start')
    3     await switch()
    4     print('C1: Stop')

    同样的,再定义第二个协程,第二个协程将从头到尾顺序执行。

    1 async def coro_2():
    2     print('C2: Start')
    3     print('C2: 1')
    4     print('C2: 2')
    5     print('C2: 3')
    6     print('C2: Stop')

    有了上面的两个协程,但我们在异步时,希望在执行完C_1的start后,切换进协程C_2,执行完成后再切换回来。那么此时就需要一个对协程切换进行控制的程序,具体顺序如下,

    1. 启动协程c_1,启动后会切换进switch函数,
    2. Switch中由于yield而切出,并保留上下文环境
    3. c_1.send()将获得返回结果(如果有的话),并继续执行
    4. 此时c_1已经被中止,启动c_2,则完成所有执行步骤,捕获生成器的中止异常
    5. 这时c_2以执行完毕,再次切回c_1(此时会从switch yield之后开始执行)继续执行。
     1 c_1 = coro_1()
     2 c_2 = coro_2()
     3 
     4 try:
     5     c_1.send(None)
     6 except StopIteration:
     7     pass
     8 try:
     9     c_2.send(None)
    10 except StopIteration:
    11     pass
    12 try:
    13     c_1.send(None)
    14 except StopIteration:
    15     pass

    最终得到结果如下,可以看到,整个过程完全按期望的流程进行,

    C1: Start
    Switch: Start
    C2: Start
    C2: 1
    C2: 2
    C2: 3
    C2: Stop
    Switch: Done
    C1: Stop

    但是这里的协程运行部分仍需改善,于是接下来便定义一个run函数用于执行一个协程列表。

    run函数首先会遍历协程列表的副本,并不断尝试启动列表中的协程,当协程结束后便将协程从协程列表中删除,直到所有的协程都执行完毕为止。

     1 def run(coros):
     2     coros = list(coros)
     3 
     4     while coros:
     5         # Duplicate list for iteration so we can remove from original list
     6         for coro in list(coros):
     7             try:
     8                 coro.send(None)
     9             except StopIteration:
    10                 coros.remove(coro)
    11 
    12 c_1 = coro_1()
    13 c_2 = coro_2()
    14 run([c_1, c_2])

    测试一下run函数,得到结果与前面相同,

    C1: Start
    Switch: Start
    C2: Start
    C2: 1
    C2: 2
    C2: 3
    C2: Stop
    Switch: Done
    C1: Stop

    到目前为止,完成了一个简单的异步模型的搭建,即c_2无需等待c_1执行完成再继续执行,而是由c_1交出了控制权进行协作完成,同时也不存在多线程的抢占式任务,因为由始至终都只有一个线程在运行,而且也没有混乱的回调函数存在。

    但是,还存在一个阻塞问题没有解决,也就是说,如果c_1中的switch函数是一个耗时的I/O操作或其他阻塞型操作,则此时需要等待switch的阻塞操作完成才能交出控制权,可如果希望在等待这个耗时操作时,先去执行c_2的任务,再回来检测c_1中的耗时操作是否完成,则需要使用非阻塞的方式。

    首先,对刚才的switch进行改造,完成一个action协程,这个协程会根据传入的参数,执行对应时间后,再退出协程引发StopIteration,实现方式如下,每次切换进action中都会记录下时间,然后将时间和第一次进入的时间进行对比,如果超过了设置的时间便退出,如果没超过限制时间,则切出协程交还出控制权。

    1 @types.coroutine
    2 def action(t):
    3     trace=[]
    4     while True:
    5         trace.append(time.time())
    6         if trace[-1] - trace[0] > t:
    7             break # This break will end this function and raise a StopIteration
    8         yield

    接着定义两个协程,分别执行action时间为2秒和3秒,同时定义一个计算时间的装饰器,用于时间记录。

     1 async def coro_1():
     2     print('C1: Start')
     3     await action(2)
     4     print('C1: Stop')
     5 
     6 
     7 async def coro_2():
     8     print('C2: Start')
     9     await action(3)
    10     print('C2: Stop')
    11 
    12 def timeit(f):
    13     def _wrapper(*args, **kwargs):
    14         start = time.time()
    15         re = f(*args, **kwargs)
    16         end = time.time()
    17         print('Time cost:', f.__name__, end-start)
    18         return re
    19     return _wrapper

    然后我们先分别运行两个协程进行一个实验,

    1 c_1 = coro_1()
    2 c_2 = coro_2()
    3 timeit(run)([c_1])
    4 timeit(run)([c_2])

    从输出的结果可以看到两个协程的耗时与action执行的时间基本相同,且顺序执行的时间为两者之和,

    C1: Start
    C1: Stop
    Time cost: run 2.030202865600586
    C2: Start
    C2: Stop
    Time cost: run 3.0653066635131836

    接下来,利用异步非阻塞的方式来执行这两个协程,

    1 c_1 = coro_1()
    2 c_2 = coro_2()
    3 timeit(run)([c_1, c_2])

    最后得到结果

    C1: Start
    C2: Start
    C1: Stop
    C2: Stop
    Time cost: run 3.0743072032928467

    从结果中可以看到,此时的运行方式是异步的形式,c_1启动后由于进入一个耗时action,且action被我们设置为非阻塞形式,因此c_1交出了控制权,控制权回到run函数后,启动了c_2,而c_2同样也进入到action中,这时两个协程都在等待任务完成,而监视run则在两个协程中不停轮询,不断进入action中查看各自的action操作是否完成,当有协程完成后,将继续启动这个协程的后续操作,直到最终所有协程结束。

    按照非阻塞异步协程的方式,可以以单线程运行,避免资源锁的建立,也消除了线程切换的开销,并且最终获得了类似多线程运行的时间性能。

    相关阅读


    1. 协程和 async / await

    参考链接


    http://www.oschina.net/translate/playing-around-with-await-async-in-python-3-5

  • 相关阅读:
    [GSEAPY] 在Python里进行基因集富集分析
    scRNAseq R包公共单细胞数据获取
    pybedtools:在Python中使用BEDTools
    pybedtools 提取序列
    将博客搬至CSDN
    【转】SELECT INTO 和 INSERT INTO SELECT 两种表复制语句
    sql长日期数据以短日期格式显示【转】
    [转]YouTube架构学习体会
    [转]让Nginx 支持 ASP ASP.NET配置方法
    [转]LINQ查询总结
  • 原文地址:https://www.cnblogs.com/stacklike/p/8379339.html
Copyright © 2011-2022 走看看