async/await是python3.5用于定义协程的关键字,async定义一个协程, await用于挂起阻塞的异步调用接口 Asyncio是单线程的,只有一个主线程,但是可以进行多个不同的(task),这里的任务,就是特殊的future对象. 这些不同的任务,被一个叫做event loop的对象所控制。
官方网站: https://docs.python.org/3/library/asyncio.html
asyncio简单用法
async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") def test_1(): start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("http://www.baidu.com") for _ in range(10)] loop.run_until_complete(asyncio.wait(tasks)) # 接收一个可迭代对象 loop.close() print(time.time() - start_time)
接收返回值
async def get_html1(url): print("start get url") await asyncio.sleep(2) print("end get url") return "返回值" def test_2(): start_time = time.time() loop = asyncio.get_event_loop() # 多个请求 tasks = [asyncio.ensure_future(get_html1("http://www.baidu.com")) for _ in range(10)] loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print(task.result()) print(time.time() - start_time) # 单个请求 task = asyncio.ensure_future(get_html1("http://www.baidu.com")) loop.run_until_complete(task) loop.close() print(task.result())
gather用法
def test_3(): loop = asyncio.get_event_loop() # gather更高级 group1 = [get_html('http://www.baidu.com') for i in range(2)] group2 = [get_html('http://www.baidu.com+++++') for i in range(2)] loop.run_until_complete(asyncio.gather(*group1, *group2)) group1 = [get_html('http://www.baidu.com') for i in range(2)] group2 = [get_html('http://www.baidu.com+++++') for i in range(2)] g1 = asyncio.gather(*group1) g2 = asyncio.gather(*group2) loop.run_until_complete(asyncio.gather(g1, g2)) loop.close()
回调函数
from functools import partial def callback1(url, arg ,feture): print(url, arg) print(f"回调函数结果:{feture.result()}") def test_4(): loop = asyncio.get_event_loop() task = asyncio.ensure_future(get_html1("http://www.baidu.com")) # 回调函数 会把 task 传入到 callback # 注意参数位置 task 最后一个传进去的 task.add_done_callback(partial(callback1, "http://www.baidu.com", "gdwjd")) loop.run_until_complete(task) print("测试结果", task.result())
取消协程
def test_5(): tasks = [get_html("http://www.baidu.com") for _ in range(3)] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except Exception as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print("=====", task.cancel()) loop.stop() loop.run_forever() finally: loop.close()
协程套协程
import aiohttp async def get(url, i): async with aiohttp.ClientSession() as session: async with session.get(url) as res: return res.status, i async def hello(i): url = "http://www.taobao.com" print(i, url) res = await get(url, i) print("状态码:", res) def test_6(): start = time.time() loop = asyncio.get_event_loop() tasks = [asyncio.ensure_future(hello(i)) for i in range(50)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print(time.time() - start)
示例:生产者与消费者通信
async def consumer(queue, id): while True: val = await queue.get() print('{} get a val: {}'.format(id, val)) await asyncio.sleep(1) async def producer(queue, id): for i in range(5): val = random.randint(0, 10) await queue.put(val) print('{} put a val: {}'.format(id, val)) await asyncio.sleep(1) async def main(): queue = asyncio.Queue() consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1')) consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2')) producer_1 = asyncio.create_task(producer(queue, 'producer_1')) producer_2 = asyncio.create_task(producer(queue, 'producer_2')) # 超过10秒,没有消息过来就会取消协程 await asyncio.sleep(10) consumer_1.cancel() consumer_2.cancel() await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True) def test_7(): start = time.perf_counter() asyncio.run(main()) end = time.perf_counter() print(end - start)