zoukankan      html  css  js  c++  java
  • 并发编程之异步asyncio

    协程简介

    协程不是计算机提供的, 而是程序员任务创造的。

    协程(Coroutine), 也可以被称为微线程, 是一种用户态上下文切换技术。简而言之, 其实就是通过一个线程实现代码块互相切换执行。

    实现协程有这么几种方法

    • greenlet, 早期模块
    • yield关键字
    • asyncio装饰器
    • async、await关键字

    greenlet实现协程

    from greenlet import greenlet
    
    
    def func1():
        print(1)        # 第二步: 输出 1
        gr2.switch()    # 第三步: 切换到 func2 函数
        print(2)        # 第六步: 输出 2
        gr2.switch()    # 第七步: 切换到 func2 函数, 从上一次执行的位置继续向后执行
    
    
    def func2():
        print(3)        # 第四步: 输出 3
        gr1.switch()    # 第五步: 切换到 func1 函数, 从上一次执行的位置继续向后执行
        print(4)        # 第八步: 输出 4
    
    
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    
    gr1.switch()    # 第一步: 去执行 func1 函数

    yield关键字

    def func1():
        yield 1
        yield from func2()
        yield 2
    
    
    def func2():
        yield 3
        yield 4
    
    
    f1 = func1()
    for item in f1:
        print(item)

    asyncio

    在Python3.4及之后的版本才能使用asyncio

    import asyncio
    
    
    @asyncio.coroutine
    def func1():
        print(1)
        yield from asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
        print(2)
    
    
    @asyncio.coroutine
    def func2():
        print(3)
        yield from asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
        print(4)
    
    
    tasks = [
        asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2())
    ]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

    注意: asyncio遇到IO阻塞会自动切换, 而上述其他的操作遇到IO阻塞是手动切换的

    async&await关键字

    在Python3.5及之后的版本

    import asyncio
    
    
    async def func1():
        print(1)
        await asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
        print(2)
    
    
    async def func2():
        print(3)
        await asyncio.sleep(2)  # 遇到IO耗时操作, 自动切换到tasks中的其他任务
        print(4)
    
    
    tasks = [
        asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2())
    ]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

    协程的意义

    在一个线程中如果遇到IO等待时间, 线程不会傻傻的等待, 会利用空闲的时候再去干其他的事情

    案例: 下载三张图片(网络IO)

    • 普通方式(同步)
    import requests
    
    
    def download_image(url):
        print('开始下载', url)
    
        response = requests.get(url)
        print('下载完成')
    
        # 图片保存到本地
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(response.content)
    
    
    if __name__ == '__main__':
        url_list = [
            'https://car2.autoimg.cn/cardfs/product/g28/M0A/19/BF/1024x0_1_q95_autohomecar__ChcCR13C4O2AHX-VAAhFPllxCw8142.jpg',
            'https://car3.autoimg.cn/cardfs/product/g28/M07/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OyAYyFGAAkPNXso9Ts320.jpg',
            'https://car2.autoimg.cn/cardfs/product/g28/M02/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OqAcAzoAAhDQ5rjm3k807.jpg'
        ]
        for item in url_list:
            download_image(item)
    View Code
    • 协程方式(异步)
    import aiohttp
    import asyncio
    
    
    async def fetch(session, url):
        print('发送请求', url)
        async with session.get(url, verify_ssl=False) as response:
            content = await response.content.read()
            file_name = url.rsplit('_')[-1]
            with open(file_name, mode='wb') as file_object:
                file_object.write(content)
        print('下载完成')
    
    
    async def main():
        async with aiohttp.ClientSession() as session:
            url_list = [
                'https://car2.autoimg.cn/cardfs/product/g28/M0A/19/BF/1024x0_1_q95_autohomecar__ChcCR13C4O2AHX-VAAhFPllxCw8142.jpg',
                'https://car3.autoimg.cn/cardfs/product/g28/M07/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OyAYyFGAAkPNXso9Ts320.jpg',
                'https://car2.autoimg.cn/cardfs/product/g28/M02/17/A2/1024x0_1_q95_autohomecar__ChsEfV3C4OqAcAzoAAhDQ5rjm3k807.jpg'
            ]
            tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
    
            await asyncio.wait(tasks)
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    View Code

    异步编程

    事件循环

    事件循环可以理解成为一个死循环, 会去检查并执行某些代码。

    # 伪代码
    
    任务列表 = [ 任务1, 任务2, 任务3, ...]
    
    while True:
        可执行的任务列表, 已完成的任务列表 = 去任务列表中检查所有的任务, 将'可执行''已完成'的任务返回
        
        for 就绪任务 in 可执行的任务列表:
            执行就绪任务
        
        for 已完成的任务 in 已完成的任务列表:
            在任务列表中移除已完成的任务
    
        如果 任务列表中的的任务都已完成, 则终止循环
    import asyncio
    
    # 去生成或获取一个事件循环
    loop = asyncio.get_event_loop()
    
    # 将任务放到"任务列表"
    loop.run_until_complete(任务)

    快速上手

    协程函数, 定义函数的时候async def 函数名, 则称该函数为协程函数

    协程对象, 执行协程函数()可以得到协程对象

    async def func():
        pass
    
    result = func()

    注意: 执行协程函数创建协程对象的时候, 函数内部代码不会执行

    如果想要运行协程函数内部代码, 必须要将协程对象交给事件循环来处理

    import asyncio
    
    
    async def func():
        print('gogogo')
    
    
    result = func()
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(result)

    await关键字

    await关键字 + 可等待的对象(协程对象、Future、Task对象 -> IO等待)

    示例1

    import asyncio
    
    async def func():
        print('来玩啊')
        response = await asyncio.sleep(2)
        print('结束', response)
    
    
    asyncio.run( func() )

    示例2

    import asyncio
    
    """
    async def func():
        print('来玩啊')
        response = await asyncio.sleep(2)
        print('结束', response)
    
    
    asyncio.run(func())
    """
    
    
    async def others():
        print('start')
        await asyncio.sleep(2)
        print('end')
    
        return '返回值'
    
    
    async def func():
        print('执行协程函数内部代码')
    
        # 遇到IO操作挂起当前协程(任务), 等IO操作完成之后再继续往下执行, 当前协程挂起时, 事件循环可以去执行其他协程(任务)
        response = await others()
    
        print('IO请求结束, 结果为:', response)
    
    
    asyncio.run(func())

    示例3

    async def others():
        print('start')
        await asyncio.sleep(2)
        print('end')
    
        return '返回值'
    
    
    async def func():
        print('执行协程函数内部代码')
    
        # 遇到IO操作挂起当前协程(任务), 等IO操作完成之后再继续往下执行, 当前协程挂起时, 事件循环可以去执行其他协程(任务)
        response1 = await others()
        print('IO请求结束, 结果为:', response1)
    
        response2 = await others()
        print('IO请求结束, 结果为:', response2)
    
    
    asyncio.run(func())

    await就是等待对象的值得到结果之后再继续向下执行。

    Task对象

    Tasks are used to schedule coroutines concurrently

    When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon

    Task对象是用来在事件循环中添加多个任务的

    Task用于并发调度协程, 通过asyncio.create_task(协程对象)的方式创建Task对象, 这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.create_task()函数以外, 还可以用低层级的loop.create_task()或ensure_future()函数。不建议手动实例化Task对象

    注意: asyncio.create_task()函数在Python3.7中被加入。在Python3.7之前, 可改用低层级的asyncio.ensure_future()函数

    示例1:

    import asyncio
    
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return '返回值'
    
    
    async def main():
        print('main开始')
    
        # 创建Task对象, 将当前执行func函数的任务添加到事件循环
        task1 = asyncio.create_task(func())
    
        # 创建Task对象, 将当前执行func函数的任务添加到事件循环
        task2 = asyncio.create_task(func())
    
        print('main结束')
    
        # 当执行某协程遇到IO的时候, 会自动化切换执行其他任务
        # 此处的await是等待相对于的协程全都执行完毕并获取数据
        ret1 = await task1
        ret2 = await task2
        print(ret1, ret2)
    
    
    asyncio.run(main())

    示例2:

    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return '返回值'
    
    
    async def main():
        print('main开始')
    
        task_list = [
            asyncio.create_task(func()),
            asyncio.create_task(func())
        ]
    
        print('main结束')
    
        done, pending = await asyncio.wait(task_list, timeout=1)
        print(done)
    
    
    asyncio.run(main())

    示例3:

    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return '返回值'
    
    
    task_list = [
        func(),
        func()
    ]
    
    done, pending = asyncio.run(asyncio.wait(task_list))
    print(done)

    asyncio.Future对象

    A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.

    Task继承Future, Task对象内部await结果的处理基于Future对象来的

    示例1:

    import asyncio
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
        
        # 创建一个任务(Future对象), 这个任务什么都不干
        fut = loop.create_future()
    
        # 等待任务最终结果(Future对象), 没有结果则会一直等待下去
        data = await fut
        print(data)
    
    asyncio.run( main )

    示例2

    async def set_after(fut):
        await asyncio.sleep(2)
        fut.set_result('666')
    
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
    
        # 创建一个任务(Future对象), 没绑定任何行为, 这个任务永远不知道什么时候结束.
        fut = loop.create_future()
    
        # 创建一个任务(Task对象), 绑定了set_after函数, 函数内部在2s之后会给fut赋值.
        # 即受到设置future任务的最终结果, 那么fut就可以结束了
        await loop.create_task(set_after(fut))
    
        # 等待Future对象获取最终结果, 否则一直等下去
        data = await fut
        print(data)
    
    
    asyncio.run(main())

    concurrent.futures.Future对象

    concurrent.funtures.Future对象是使用线程池、进程池实现异步操作时用到的对象。

    import time
    from concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures.process import ProcessPoolExecutor
    
    def func(value):
        time.sleep(1)
        print(value)
        return 123
    
    # 创建线程池
    pool = ProcessPoolExecutor(max_workers=5)
    
    # 创建进程池
    # pool = ThreadPoolExecutor(max_workers=5)
    
    for i in range(10):
        fut = pool.submit(func, i)
        print(fut)
    import time
    import asyncio
    import concurrent.futures
    
    
    def func1():
        # 某个耗时操作
        time.sleep(2)
        return 'hello world'
    
    
    async def main():
        loop = asyncio.get_running_loop()
    
        # 1. Run in the default loop's executor(默认ThreadPoolExecutor)
        # 第一步: 内部会先调用ThreadPoolExecutor的submit方法去线程池中申请一个线程去执行fun1函数, 并返回一个concurrent.future.Future对象
        # 第二步: 调用asyncio.wrap_future对象将concurrent.futures.Future对象包装成asycio.Future对象
        # 因为concurrent.futures.Future对象不支持await语法, 所以需要包装成asyncio.Future对象才能使用
        fut = loop.run_in_executor(None, func1)
        result = await fut
        print('default thread pool', result)
    
        # 2. Run in a custom thread pool:
        # with concurrent.futures.ThreadPoolExecutor() as pool:
        #     result = await loop.run_in_executor(
        #         pool, func1
        #
        #     )
        #     print('custom thread pool', result)
    
        # 3. Run in a custom process pool:
        # with concurrent.futures.ProcessPoolExecutor() as pool:
        #     result = await loop.run_in_executor(
        #         pool, func1
        #     )
        #     print('custom process pool', result)
    
    asyncio.run(main())

    案例: asyncio + 不支持异步模块

    异步迭代器

    什么是异步迭代器

    实现了__aiter__()和__anext__()方法的对象。__anext__必须返回一个awaitable对象。async for会处理异步迭代器的__anext__()方法所返回的可迭代对象, 直到其引发一个StopAsyncIteration异常。

    什么是异步可迭代对象

    可在async for语句中被使用的对象。必须通过它的__aiter__()方法返回一个asyncchronous iterator。

    import asyncio
    
    
    class Reader(object):
        """ 自定义异步迭代器(同时也是异步可迭代对象) """
    
        def __init__(self):
            self.count = 0
    
        async def readline(self):
            # await asyncio.sleep(1)
            self.count += 1
            if self.count == 100:
                return None
            return self.count
    
        def __aiter__(self):
            return self
    
        async def __anext__(self):
            val = await self.readline()
            if val == None:
                raise StopAsyncIteration
            return val
    
    async def func():
        obj = Reader()
        async for item in obj:
            print(item)
    
    asyncio.run(func())

    异步上下文管理器

    异步上下文管理器对象通过定义__aenter__()和__aexit__()方法来对async with语句中的环境进行控制

    import asyncio
    
    
    class AsyncContextManager(object):
        # def __init__(self):
            # self.conn = conn
    
        async def do_something(self):
            # 异步操作数据库
            return 666
    
        async def __aenter__(self):
            # 异步链接数据库
            # self.conn = await asyncio.sleep(1)
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            # 异步关闭数据库
            await asyncio.sleep(1)
    
    
    async def main():
        async with AsyncContextManager() as f:
            result = await f.do_something()
            print(result)
    
    
    if __name__ == '__main__':
        asyncio.run(main())

    uvloop

    是asyncio的事件循环的替代方案。uvloop事件循环大于默认asyncio的事件循环

    pip install uvloop
    import asyncio
    import uvloop
    
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    # 编写asyncio的代码, 与之前的代码一致
    
    # 内部的事件循环自动化会变成uvloop
    asyncio.run(...)

    案例

    异步操作Redis

    在使用Python代码操作Redis时, 链接/操作/断开都是网络IO

    pip install ailredis

    示例1:

    import asyncio
    import aioredis
    
    
    async def execute(address, password):
        print('开始执行: ', address)
        # 网络IO操作: 创建Redis连接
        redis = aioredis.create_redis(address, password=password)
    
        # 网络IO操作: 在Redis中设置哈希值car, 内部再设置三个键值对, 即: redis = {car: {key1:1, key2:2, key3:3}}
        await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
        # 网络IO操作: 去Redis中获取值
        result = await redis.hgetall('car', encoding='utf-8')
        print(result)
    
        redis.close()
        # 网络IO操作: 关闭Redis连接
        await redis.wait_closed()
    
        print('结束: ', address)
    
    asyncio.run(execute('redis://127.0.0.1:6379', 'redis'))

    示例2:

    import asyncio
    import aioredis
    
    
    async def execute(address, password):
        print('开始执行: ', address)
        # 网络IO操作: 创建Redis连接
        redis = aioredis.create_redis(address, password=password)
    
        # 网络IO操作: 在Redis中设置哈希值car, 内部再设置三个键值对, 即: redis = {car: {key1:1, key2:2, key3:3}}
        await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
        # 网络IO操作: 去Redis中获取值
        result = await redis.hgetall('car', encoding='utf-8')
        print(result)
    
        redis.close()
        # 网络IO操作: 关闭Redis连接
        await redis.wait_closed()
    
        print('结束: ', address)
    
    task_list = [
        execute('redis://127.0.0.1:6379', 'redis'),
        execute('redis://127.0.0.1:6379', 'redis'),
    ]
    
    asyncio.run(asyncio.wait(task_list))

    异步MySQL

    pip install aiomysql

    示例1

    import asyncio
    import aiomysql
    
    
    async def execute():
        # 网络IO操作: 连接MySQL
        conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql')
    
        # 网络IO操作: 创建CURSOR
        cursor = await conn.cursor()
    
        # 网络IO操作: 执行SQL
        await cursor.execute('SELECT HOST,User FROM user')
    
        # 网络IO操作: 获取SQL结果
        result = await cursor.fetchall()
        print(result)
    
        # 网络IO操作: 关闭连接
        await cursor.close()
        conn.close()
    
    
    asyncio.run(execute())

    示例2

    import asyncio
    import aiomysql
    
    
    async def execute(host, port, user, password, db):
        # 网络IO操作: 连接MySQL
        conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql')
    
        # 网络IO操作: 创建CURSOR
        cursor = await conn.cursor()
    
        # 网络IO操作: 执行SQL
        await cursor.execute('SELECT HOST,User FROM user')
    
        # 网络IO操作: 获取SQL结果
        result = await cursor.fetchall()
        print(result)
    
        # 网络IO操作: 关闭连接
        await cursor.close()
        conn.close()
    
    
    task_list = [
        execute(host='127.0.0.1', port=3306, user='root', password='123', db='mysql'),
        execute(host='127.0.0.1', port=3306, user='root', password='123', db='mysql'),
    ]
    
    asyncio.run(asyncio.wait(task_list))

    异步爬虫

    import aiohttp
    import asyncio
    
    
    async def fetch(session, url):
        print('发送请求:', url)
        async with session.get(url, verify_ssl=False) as response:
            text = await response.text()
            print('得到结果:', url, len(text))
    
    
    async def main():
        async with aiohttp.ClientSession() as session:
            url_list = [
                'https://python.org',
                'https://www.baidu.com',
                'http://www.pythonav.com'
            ]
            tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
    
            await asyncio.wait(tasks)
    
    
    if __name__ == '__main__':
        asyncio.run(main())
  • 相关阅读:
    14.3.2.1 Transaction Isolation Levels 事务隔离级别
    ReentrantLock可重入锁
    Lock与synchronized 的区别
    synchronized 与 Lock
    This usually indicates a missing no-arg constructor or that the editor's class name was mistyped in
    Oracle dump 分析secondary key
    java.lang.ClassNotFoundException: org.springframework.web.util.IntrospectorCleanupListener
    Oracle 验证IOT表数据存储在主键里
    Cause: java.lang.IllegalArgumentException: Mapped Statements collection does not contain value for U
    Oracle heap 表的主键 dump 分析
  • 原文地址:https://www.cnblogs.com/featherwit/p/13404115.html
Copyright © 2011-2022 走看看