zoukankan      html  css  js  c++  java
  • python异步编程之 async await

    python异步编程之 async await

    本文代码采用python3.6运行.

    发展史

    - 3.3: The yield from expression allows for generator delegation.
    - 3.4: asyncio was introduced in the Python standard library with provisional API status.
    - 3.5: async and await became a part of the Python grammar, used to signify and wait on coroutines. They were not yet reserved keywords. (You could still define functions or variables named async and await.)
    - 3.6: Asynchronous generators and asynchronous comprehensions were introduced. The API of asyncio was declared stable rather than provisional.
    - 3.7: async and await became reserved keywords. (They cannot be used as identifiers.) They are intended to replace the asyncio.coroutine() decorator. asyncio.run() was introduced to the asyncio package, among a bunch of other features.
    

    本质上是使用了协程,当调用await时让渡CPU,有结果返回时再切换回来.相比使用回调来协调执行顺序来说,await编程方式在每个协程中代码是顺序执行的,对代码编写来说更为友好.

    语法

    async def

    调用一个async func返回了什么?

    async def fun1():
        return
    
    async def fun2():
        yield
    
    print(type(fun1()))
    print(type(fun2()))
    
    # <class 'coroutine'>
    # <class 'async_generator'>
    

    await

    • await只能用于async函数中

      async def f(x):
          y = await z(x)  # OK - `await` and `return` allowed in coroutines
          return y
      
      async def g(x):
          yield x  # OK - this is an async generator
      
      async def m(x):
          yield from gen(x)  # No - SyntaxError
      
      def m(x):
          y = await z(x)  # Still no - SyntaxError (no `async def` here)
          return y
      
    • await后面只能跟一个awaitable对象(一般情况下是一个Coroutine对象)

    调用(启用)方式

    import time
    import asyncio
    
    
    async def coro(seq) -> list:
        """'IO' wait time is proportional to the max element."""
        await asyncio.sleep(max(seq))
        return list(reversed(seq))
    
    
    async def main():
        """
        ensure_future():创建任务,Python 3.7+后被create_task()取代.
        t.done(): 返回任务执行状态:Bool类型.
    
        执行结果:
        ------------------------------------------
        t: type <class '_asyncio.Task'>
        t done: False
        t result: [1, 2, 3]
        t done: True
        ------------------------------------------
        """
        # t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+
        t = asyncio.ensure_future(coro([3, 2, 1]))  # Python 3.*
    
        print(f't: type {type(t)}')
        print(f't done: {t.done()}')
        a = await t
        print(f't result: {a}')
        print(f't done: {t.done()}')
    
    async def main2():
        """
        tasks = gather():收集多个任务
        await tasks:任务全部完成一起返回
    
        执行结果:
        ------------------------------------------
        Start: 11:11:29
        [[1, 2, 3], [0, 5, 6]]
        End: 11:11:35
        Both tasks done: True
        ------------------------------------------
        """
        t = asyncio.ensure_future(coro([3, 2, 1]))
        t2 = asyncio.ensure_future(coro([6, 5, 0]))
        print('Start:', time.strftime('%X'))
        a = await asyncio.gather(t, t2)
        print(a)
        print('End:', time.strftime('%X'))  # Should be 10 seconds
        print(f'Both tasks done: {all((t.done(), t2.done()))}')
    
    
    async def main3():
        """
        as_completed 每个任务完成即返回该任务的结果.
    
        执行结果:
        ------------------------------------------
        Start: 11:08:13
        res: [1, 2, 3] completed at 11:08:16
        # 三秒后
        res: [0, 5, 6] completed at 11:08:19
        End: 11:08:19
        Both tasks done: True
        ------------------------------------------
    
        """
        t = asyncio.ensure_future(coro([3, 2, 1]))
        t2 = asyncio.ensure_future(coro([6, 5, 0]))
        print('Start:', time.strftime('%X'))
        for res in asyncio.as_completed((t, t2)):
            compl = await res
            print(f'res: {compl} completed at {time.strftime("%X")}')
        print('End:', time.strftime('%X'))
        print(f'Both tasks done: {all((t.done(), t2.done()))}')
    
    
    if __name__ == '__main__':
        # python < 3.7
        loop = asyncio.get_event_loop()
        # loop.run_until_complete(main())
        # loop.run_until_complete(main2())
        loop.run_until_complete(main3())
        loop.close()
    
        # python 3.7
        # asyncio.run(main())
    
    

    DEMO

    并发http请求

    该Demo使用aiohttp发送异步请求.

    """
    并发爬取36kr数据,并保存为本地文件
    """
    import asyncio
    import aiohttp
    import json
    
    
    async def get_json(client, url):
        async with client.get(url) as response:
            assert response.status == 200
            return await response.read()
    
    
    async def get_36_kr(page, client):
        data1 = await get_json(client, 'https://36kr.com/api/search-column/mainsite?per_page=20&page={}'.format(page))
        j = json.loads(data1.decode('utf-8'))
    
        with open(f"./content/{page}.json", "w", encoding='utf-8') as f:
            f.write(json.dumps(j, indent=2, ensure_ascii=False))
    
        print('Done with page:', page)
    
    
    async def main():
        client = aiohttp.ClientSession(loop=loop)
        tasks = [asyncio.ensure_future(get_36_kr(i, client)) for i in range(100)]
        await asyncio.gather(*tasks)
        await client.close()
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        loop.close()
    
    

    精简下(需python3.7下运行)

    import asyncio
    import aiohttp
    import json
    
    
    async def get_36_kr(page, client):
        url = f'https://36kr.com/api/search-column/mainsite?per_page=1&page={page}'
        async with client.get(url, ssl=False) as response:
            assert response.status == 200
            return await response.json()
    
    
    async def main():
        async with aiohttp.ClientSession() as client:
            tasks = [asyncio.ensure_future(get_36_kr(i, client))
                     for i in range(2)]
            res = await asyncio.gather(*tasks)
            print(json.dumps(res,indent=2))
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    异步IO库

    async/await可使用的库.

    • From aio-libs:

      • aiohttp: Asynchronous HTTP client/server framework
      • aioredis: Async IO Redis support
      • aiopg: Async IO PostgreSQL support
      • aiomcache: Async IO memcached client
      • aiokafka: Async IO Kafka client
      • aiozmq: Async IO ZeroMQ support
      • aiojobs: Jobs scheduler for managing background tasks
      • async_lru: Simple LRU cache for async IO
    • From magicstack:

      • uvloop: Ultra fast async IO event loop
      • asyncpg: (Also very fast) async IO PostgreSQL support
    • From other hosts:

      • trio: Friendlier asyncio intended to showcase a radically simpler design
      • aiofiles: Async file IO
      • asks: Async requests-like http library
      • asyncio-redis: Async IO Redis support
      • aioprocessing: Integrates multiprocessing module with asyncio
      • umongo: Async IO MongoDB client
      • unsync: Unsynchronize asyncio
      • aiostream: Like itertools, but async
  • 相关阅读:
    linux下面安装maven
    go 安装
    linux scp 服务器远程拷贝
    git有merge时如何删除分支
    influxdb ERR: error parsing query: found -, expected
    influxDB学习总结
    influxdb Measurements
    go exec: "gcc": executable file not found in %PATH%
    php计算脚本执行时间
    安装nodejs和grunt以后出现 /usr/bin/env: node: No such file or directory
  • 原文地址:https://www.cnblogs.com/aloe-n/p/10412391.html
Copyright © 2011-2022 走看看