zoukankan      html  css  js  c++  java
  • async 异步协程进阶

    协程通过 async/await 语法进行声明,是编写异步应用的推荐方式

    例如新定义一个协程(coroutine object)

    async def foo():
    return 42

    首先先来介绍下:

    认识aysn和asyncio都有哪些函数方法:

    创建一个future 对象:

    task = asyncio.create_task(foo())
    或者使用
    task=asyncio.ensure_future(foo())
    那么如何判断创建的task到底是不是future 对象呢?
    async def exetask():
    # task = asyncio.create_task(foo())
    task = asyncio.ensure_future(foo())
    if isinstance(task,asyncio.Future):
    print("yes")
    else:
    print("no")
    s=await task
    asyncio.run(exetask())
    结果如下:

      yes

    答案是肯定的。

    2.如何运行一个协程:

    要真正运行一个协程,asyncio 提供了三种主要机制:

    第一种:

    • asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数 (参见上面的示例。)

    第二种:

    通过loop对象实现:

    loop=asyncio.get_event_loop()
    async  def  jobs():
    i=10
    while i>0:
    # time.sleep(0.5)
    i=i-1
    return 0
    tasks=[asyncio.ensure_future(jobs(k)) for k in range(1,4)]
    res=loop.run_until_complete(asyncio.gather(*tasks))
    print(res)
    loop.close()

    3.并发

    第1种并发运行:
    当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务asyncio.create_task(coro)将 coro 协程 打包为一个 Task 排入日程准备执行,
    返回 Task 对象。此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,of course你也可以改用低层级的 asyncio.ensure_future() 函数, This works in all Python versions but is less readable
    #### ordinal job async
    async def jobs():
    i=10
    while i>0:
    # time.sleep(0.5)
    i=i-1
    return 0
    async def get_status():
    r=await jobs()
    return r
    loop=event=asyncio.get_event_loop()
    tasks = [asyncio.create_task(get_status()) for k in range(1,4)]
    for task in tasks:
        r=loop.run_until_complete(asyncio.wait_for(task,timeout=10))
        print(r)
    

      这里我采用了比较低级的loop事件对象来调用run_until_complete() 方法来实现:

    事实上开发者一般更喜欢采用高级用法asyncio.wait()或者asyncio.wait_for()来实现这写异步任务调用:

    在进行下面介绍之前我想你应该先了解:asyncio.wait_for(awtimeout*loop=None) 

      等待 aw 可等待对象 完成,指定 timeout 秒数后超时。

      如果 aw 是一个协程,它将自动作为任务加入日程。

      timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。

      如果发生超时,任务将取消并引发 asyncio.TimeoutError.

      要避免任务 取消,可以加上 shield()

      函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。

      如果等待被取消,则 aw 指定的对象也会被取消。

      loop 参数已弃用,计划在 Python 3.10 中移除。

    asyncio.wait(aws*loop=Nonetimeout=Nonereturn_when=ALL_COMPLETED)

    并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。

    如果 aws 中的某个可等待对象为协程,它将自动作为任务加入日程。直接向 wait() 传入协程对象已弃用,因为这会导致 令人迷惑的行为

    返回两个 Task/Future 集合: (done, pending)

    用法:

    done, pending = await asyncio.wait(aws)
    

    loop 参数已弃用,计划在 Python 3.10 中移除。

    如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。

    请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。

    return_when 指定此函数应在何时返回。它必须为以下常数之一:

    常数

    描述

    FIRST_COMPLETED

    函数将在任意可等待对象结束或取消时返回。

    FIRST_EXCEPTION

    函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED

    ALL_COMPLETED

    函数将在所有可等待对象结束或取消时返回。

    与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。


    如何验证wait 不取消,wait_for 取消aw对象呢:

    我们来看个例子:

    先看wait_for:

    async def foo(k=0):
    await asyncio.sleep(30)
    return k

    async def exetask():
    task=asyncio.create_task(foo(k=1))
    try:
    await asyncio.wait_for(task,timeout=1)
    print(task.cancelled())
    #3.7 改为当 aw 因超时被取消,wait_for 会等待 aw 被取消,3.7之前直接报异常,
    except asyncio.TimeoutError:
    print("timeout ")
    # task.cancel()
    print(task.cancelled())
    asyncio.run(exetask())
    结果:

    C:Python37python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
    timeout
    True

    再看wait,这里注意由于waitexpect a list of futures, not Task,我换种写法:

    async def foo(k=0):
    await asyncio.sleep(30)
    return k


    async def exetask():
    task = asyncio.create_task(foo(k=1))
    try:
    #请注意wait函数不会引发 asyncio.TimeoutError
    await asyncio.wait([task], timeout=1)
    ## 3.7 改为当 aw 因超时被取消,wait_for 会等待 aw 被取消,3.7之前直接报异常,
    # await asyncio.wait_for(task,timeout=1)

    except asyncio.TimeoutError:
    print("timeout ")
    print(task.cancelled())


    asyncio.run(exetask())

    结果:

    C:Python37python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
    False

    Process finished with exit code 0

     这里完美的展现了阻塞的魅力!!!!!!!

     
    第2种并发运行:
    awaitable asyncio.gather(*awsloop=Nonereturn_exceptions=False)

    并发 运行 aws 序列中的 可等待对象

    下面来看个简单的例子:

    async def count(k):
        print(f"start job{k} {time.asctime()} ")
        await asyncio.sleep(0.6)
        print(f"end job{k} {time.asctime()}")
        return k
    async def mayns():
        r=await asyncio.gather(count(1),count(2))
        return r
    def test():
        import time
        st=time.perf_counter()
        results=asyncio.run(mayns())
        elapsed=time.perf_counter()-st
        print(f"result return :{results}  execute in {elapsed:0.2} seconds.")
    

    运行结果:

    start job1 Fri Dec 13 23:00:38 2019
    start job2 Fri Dec 13 23:00:38 2019
    end job1 Fri Dec 13 23:00:39 2019
    end job2 Fri Dec 13 23:00:39 2019
    result return :[1, 2] execute in 0.6 seconds.

    gather直接返回的是调用的task的所有result列表

    当然你也可以这样搜集任务:

    最后介绍一下如何实现异步http请求:
    # request 库同步阻塞,aiohttp才是异步的请求,pip install aiohttp
    from aiohttp import ClientSession as session
    async def  test2(k):
        r=await other_test(k)
        return r
    async def  other_test(k):
        print("start await job %s,%s"%(k,time.asctime()))
        urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
        async with session() as request:
            async with request.get(urls[0]) as rq:
                r=await  rq.read()
                res=r.decode("utf-8")
        print("end await job %s,%s"%(k,time.asctime()))
        return res
    
    def test_aiohttp():
        klist=[100,50,88]
        loop=asyncio.get_event_loop()
        tasks=[asyncio.ensure_future(test2(k)) for k in klist]
        # loop.run_until_complete(asyncio.wait(tasks))
        #可通过asyncio.gather(*tasks)将响应全部收集起来
        res=loop.run_until_complete(asyncio.gather(*tasks))
        print(res)
        loop.close()
    

      结果:

    C:Python37python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
    start await job 100,Fri Dec 13 23:54:31 2019
    start await job 50,Fri Dec 13 23:54:31 2019
    start await job 88,Fri Dec 13 23:54:31 2019
    end await job 88,Fri Dec 13 23:54:31 2019
    end await job 50,Fri Dec 13 23:54:31 2019
    end await job 100,Fri Dec 13 23:54:31 2019
    ['{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}']

    Process finished with exit code 0


    服务是自己用djangO起的:
    from django.shortcuts import render
    # Create your views here.
    from django.http import HttpResponse
    import json
    from . models import Student,Grade
    from django.db import models
    def index(request):
    data={"user":"test001","msg":"this is test index view "}
    js=json.dumps(data)
    return HttpResponse(js)



    def stuTable(request):
    import time
    time.sleep(3)
    # stu_object=Student.objects.all()
    # return render(request,template_name="index.html",context={"student_object":stu_object})
    return HttpResponse(json.dumps({"A":888,"NN":899}))
     


  • 相关阅读:
    Linux下编辑、编译、调试命令总结——gcc和gdb描述
    scanf函数读取缓冲区数据的问题
    Windows下设置Ubuntu引导项
    前端术语汇总笔记(会保持更新)
    实现动态加载一个 JavaScript 资源
    提取一个字符串中的数字,并将其转为数组
    CSS3图片倒影技术
    js函数聚合
    js继承函数封装
    联动菜单实现思路
  • 原文地址:https://www.cnblogs.com/SunshineKimi/p/12036896.html
Copyright © 2011-2022 走看看