zoukankan      html  css  js  c++  java
  • 流畅的python,Fluent Python 第十八章笔记 (使用asyncio包处理并发)

    书中的代码用的应该是Python3.3的,从Python3.5开始协程用async 与await 代替了@asyncio.coroutine与yield.from

    话说asyncio与aiohttp配合使用,从书中的教程来看真的非常强大,以后的并发io处理,协程才是王道了。

    18.1线程与协程对比

    import threading
    import itertools
    import time
    import sys
    
    
    class Signal:
        go = True
    
    
    def spin(msg, signal):
        write, flush = sys.stdout.write, sys.stdout.flush
        for char in itertools.cycle('|/-\'):
            status = char + ' ' + msg
            write(status)
            flush()
            write('x08' * len(status))
            time.sleep(.1)
            if not signal.go:
                break
        write(' ' * len(status) + 'x08' * len(status))
    
    
    def slow_function():
        time.sleep(3)
        return 42
    
    
    def supervisor():
        singal = Signal()
        spinner = threading.Thread(target=spin, args=('thinking!', singal))
        print('spinner object:', spinner)
        # spinner线程开始
        spinner.start()
        # 主线程休眠
        result = slow_function()
        # 给spinner线程发送关闭信号
        singal.go = False
        spinner.join()
        return result
    
    
    def main():
        result = supervisor()
        print('Answer', result)
    
    
    if __name__ == '__main__':
        main()
    

     这个一个多线程的显示脚本,一共两个线程,主线程负责显示最后的输出,开辟的子线程负责显示转圈圈。

    import asyncio
    import itertools
    import sys
    
    
    
    async def spin(msg):
        write, flush = sys.stdout.write, sys.stdout.flush
        for char in itertools.cycle('|/-\'):
            status = char + ' ' + msg
            write(status)
            flush()
            # 继续回到初始位置
            write('x08' * len(status))
            try:
                await asyncio.sleep(.1)
            # 捕获错误
            except asyncio.CancelledError:
                break
        write(' ' * len(status) + 'x08' * len(status))
    
    
    
    async def slow_function():
        await asyncio.sleep(3)
        return 42
    
    
    
    async def supervisor():
        # 把协程包装成为一个task任务
        spinner = asyncio.ensure_future(spin('thinking!'))
        # 已经是一个任务返回的还是一个任务
        spinner = asyncio.ensure_future(spinner)
        print('spinner object:', spinner)
        # 激活等待slow_function任务,由于slow_function里面有sleep,会把控制权,到时候会把控制权转给spinnner
        slow_function1 = asyncio.ensure_future(slow_function())
        result = await slow_function1
        # 得到result,取消spinner任务
        # await asyncio.sleep(5)
        spinner.cancel()
        return result
    
    def main():
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(supervisor())
        loop.close()
        print('Answer', result)
    
    if __name__ == '__main__':
        main()
    

     这是一个协程版本的,最后的协程supervisor()里面一共有两个任务,通过await asyncio.sleep()切换协程的工作。

    asyncio.ensure_future(...)接收的是一个协程,排定它的运作时间,排定它的运行时间,然后返回个asyncio.Task实例,也就是asyncio.Future的实例,因为Task
    是Future的子类,用于包装协程。

    task或者future都有.done(), .add_done_callback(....)和.result()等方法,只不过这些方法一般用的比较少,只要result=await myfuture(),其中await后面需要回调的参数就是,result就是task的result。

    无需调用my_future.add_done_callback(...),因为可以直接把想在future运行结束后执行的操作放在协程中 await my_futre表达式的后面。这个是协程的一大优势:协程是可以暂停和恢复函数的。

    无需调用my_future.result(),因为await 从 future中产出的值就是结果。(列如,result = await my_future)。

    18.2使用asyncio和aiohttp包下载

    重点记录:在一个asyncio中,基本的流程是一样的:在一个单线程程序中使用主循环一次激活队列里的携程。各个协程向前执行几步,然后把控制权让给主循环,主循环再激活队列里的下一个协程。

    import asyncio
    import aiohttp
    
    from flags import BASE_URL, save_flag, show, main
    
    
    async  def get_flag(cc):
        url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
        async with aiohttp.ClientSession() as session:
            # 运行底层库函数session.get(url)
            async with session.get(url) as resp:
                # print(type(resp))
                # resp.read()是一个协程,必须通过await获取响应内容
                image = await resp.read()
        return image
    
    
    async def download_one(cc):
        # 管道传递给下一个协程函数
        image = await get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
        return cc
    
    
    def download_many(cc_list):
        # 创建事件循环
        loop = asyncio.get_event_loop()
        to_do = [download_one(cc) for cc in sorted(cc_list)]
        # print(type(to_do[0]))
        wait_coro = asyncio.wait(to_do)
        # 运行wait_coro里面的task任务
        res, _ = loop.run_until_complete(wait_coro)
        # print(res , _)
        loop.close()
        return len(res)
    
    if __name__ == '__main__':
        main(download_many)
    

     一个简单版本的协程下载国旗脚本。

    书中最后的重点说到,我们使用asyncio包时,由asyncio包实现的事件循环去做,比如asyncio包API的某个函数(如loop.run_until_complete(...))

    我们编写的协程链条最终通过await 把职责委托给asyncio包中的某个协程函数或协程方法(比如await asyncio.sleep(...),或者其他库中实现高层协议的协程(比如resp = await resp.read())

    也就是说,最内层的子生成器是库中真正执行I/O操作的函数,而不是我们编写的函数。(言下之意,就是I/O操作必须是高层协议提供的协程)

    概括起来就是:使用asyncio包时,我们编写的异步代码中包含由asyncio本身驱动的协程(即委派生成器),而生成器最终把职责委托给asyncio包或第三方库(如aiohttp)中的协程。

    这种方式相当于架起了管道,让asyncio事件循环(通过我们编写的协程)驱动执行底层异步I/O操作的库函数。

    18.3避免阻塞性调用。

    有两种方法能避免阻塞调用终止整个应用程序的进程:

    1、在单独的线程中运行呵呵阻塞型操作。

    2、把每个阻塞型操作转换成非阻塞的异步操作。

    18.4改进asyncio下载版本。

    可以接收错误,并且在保存图片的时候,也采取了异步的措施,但实际操作中,异步保存图片文件过大会报错,不知道为什么。

    import asyncio
    import collections
    
    import aiohttp
    from aiohttp import web
    import tqdm
    
    from flags2_common import  main, HTTPStatus, Result, save_flag
    
    # 默认设为较小的值,防止远程网络出错
    DEFAULT_CONCUR_REQ = 5
    MAX_CONCUR_REQ = 1000
    
    
    class FetchError(Exception):
        def __init__(self, country_code):
            # 继承父类的args添加属性,pythoncookbook书中介绍最好这样写
            super(FetchError, self).__init__(country_code)
            self.country_code = country_code
    
    async def get_flag(base_url, cc):
        url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
        async with aiohttp.ClientSession() as session:
            # 运行底层库函数session.get(url)
            async with session.get(url) as resp:
                # 如果响应正常
                if resp.status == 200:
                    image = await resp.read()
                    return image
                # 404 上报内置的定点错误
                elif resp.status == 404:
                    raise web.HTTPNotFound
                # 其他错误收集错误码信息上报
                else:
                    raise aiohttp.ClientHttpProxyError(
                        code=resp.status, message=resp.reason,
                        headers=resp.headers
                    )
    
    
    async def down_load_one(cc, base_url, semaphore, verbose):
        # 404与正常下载返回对象关系,其他上浮错误给调用者
        try:
            # 在标记的最大协程数量下工作
            with (await semaphore):
                image = await get_flag(base_url, cc)
        # 抓取上浮的404的错误
        except web.HTTPNotFound:
            # 用Enum关系表达式保存status
            status = HTTPStatus.not_found
            msg = 'not found'
        # 其它引起的报错为基础错误Exception,上浮给调用者
        except Exception as exc:
            raise FetchError(cc) from exc
        else:
            # 正常情况下,保存图片
            # run_in_executor方法的第一个参数是Executor实例;如果设为None,使用事件循环的默认为ThreadPoolExecutor
            loop = asyncio.get_event_loop()
            loop.run_in_executor(None,
                                 save_flag, image, cc.lower() + '.gif')
            # save_flag(image, cc.lower() + '.gif')
            status = HTTPStatus.ok
            msg = 'ok'
        if verbose and msg:
            print(cc, msg)
        # 返回namedtuple对象
        return Result(status, cc)
    
    
    async def downloader_coro(cc_list, base_url, verbose, concur_req):
        # 计数器
        counter = collections.Counter()
        # 标记协程最大对象
        semaphore = asyncio.Semaphore(concur_req)
        # 协程列表
        to_do = [
            down_load_one(cc, base_url, semaphore, verbose)
            for cc in cc_list
        ]
        # 完成的工作任务
        to_do_iter = asyncio.as_completed(to_do)
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
        for future in to_do_iter:
            try:
                res = await future
            # 获取调用者的错误信息
            except FetchError as exc:
                country_code = exc.country_code
                try:
                    # 尝试获取引起错误的实例的参数args
                    error_msg = exc.__cause__.args[0]
                except IndexError:
                    # 如果取不到索引获取引起错误的实例的类的名字
                    error_msg = exc.__cause__.__class__.__name
                if verbose and error_msg:
                    msg = '*** Error for {}: {}'
                    print(msg.format(country_code, error_msg))
                # 其他的错误类型用Enum的最后一种关系表达式
                status = HTTPStatus.error
            else:
                status = res.status
            # 统计各种状态
            counter[status] += 1
    
        return counter
    
    def download_many(cc_list, base_url, verbose, concur_req):
        loop = asyncio.get_event_loop()
        coro = downloader_coro(cc_list, base_url, verbose, concur_req)
        counts = loop.run_until_complete(coro)
        loop.close()
        # print(counts)
        # 返回运行完成后的统计字典
        return counts
    
    
    if __name__ == '__main__':
        main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
    

     这个代码我只能看懂,逻辑实在是紧密,让我学到了很多。

    18.5 从回调到future和协程

    使用await结果做异步编程,无需使用回调。能够避免回调函数的回调地狱。

    在我的实际使用中,如果多个协程进行逻辑上面从上到下的运行,且存在上下文关系,中间不能插入普通函数参与上下文的工作中,因为在协程的任务运行期间,普通函数根本没有机会获得控制权,也就没有机会运行。

    import asyncio
    import collections
    
    import aiohttp
    from aiohttp import web
    import tqdm
    
    from flags2_common import  main, HTTPStatus, Result, save_flag
    
    # 默认设为较小的值,防止远程网络出错
    DEFAULT_CONCUR_REQ = 5
    MAX_CONCUR_REQ = 1000
    
    
    class FetchError(Exception):
        def __init__(self, country_code):
            # 继承父类的args添加属性,pythoncookbook书中介绍最好这样写
            super(FetchError, self).__init__(country_code)
            self.country_code = country_code
    
    async def http_get(url):
        async with aiohttp.ClientSession() as session:
            # 运行底层库函数session.get(url)
            async with session.get(url) as resp:
                if resp.status == 200:
                    # 获取数据的类型
                    ctype = resp.headers.get('Content-type','').lower()
                    # print(ctype)
                    # 如果json在请求头或者url尾巴里面,发挥json数据
                    if 'json' in ctype or url.endswith('json'):
                        data = await resp.json()
                        # print(data)
                    else:
                        data = await resp.read()
                    # print(data)
                    return data
                elif resp.status == 404:
                    return web.HTTPNotFound
                else:
                    raise aiohttp.ClientHttpProxyError(
                        code=resp.status,
                        message=resp.reason,
                        headers=resp.headers
                    )
    
    
    async def get_country(base_url, cc):
        url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
        # print(url) 返回国家的信息
        metadata = await http_get(url)
        return metadata['country']
    
    
    async def get_flag(base_url, cc):
        url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
        # print(url)
        return (await http_get(url))
    
    
    async def down_load_one(cc, base_url, semaphore, verbose):
        # 404与正常下载返回对象关系,其他上浮错误给调用者
        try:
            # 在标记的最大协程数量下工作
            async with semaphore:
                image = await get_flag(base_url, cc)
                # print(image)
            async with semaphore:
                country = await get_country(base_url, cc)
        # 抓取上浮的404的错误
        except web.HTTPNotFound:
            # 用Enum关系表达式保存status
            status = HTTPStatus.not_found
            msg = 'not found'
        # 其它引起的报错为基础错误Exception,上浮给调用者
        except Exception as exc:
            raise FetchError(cc) from exc
        else:
            # 正常情况下,保存图片
            # run_in_executor方法的第一个参数是Executor实例;如果设为None,使用事件循环的默认为ThreadPoolExecutor
            country = country.replace(' ','_')
            filename = '{}-{}.gif'.format(country, cc)
            loop = asyncio.get_event_loop()
            loop.run_in_executor(None,
                                 save_flag, image, filename)
            # save_flag(image, cc.lower() + '.gif')
            status = HTTPStatus.ok
            msg = 'ok'
        if verbose and msg:
            print(cc, msg)
        # 返回namedtuple对象
        return Result(status, cc)
    
    
    async def downloader_coro(cc_list, base_url, verbose, concur_req):
        # 计数器
        counter = collections.Counter()
        # 标记协程最大对象
        semaphore = asyncio.Semaphore(concur_req)
        # 协程列表
        to_do = [
            down_load_one(cc, base_url, semaphore, verbose)
            for cc in cc_list
        ]
        # 完成的工作任务
        to_do_iter = asyncio.as_completed(to_do)
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
        for future in to_do_iter:
            try:
                res = await future
            # 获取调用者的错误信息
            except FetchError as exc:
                country_code = exc.country_code
                try:
                    # 尝试获取引起错误的实例的参数args
                    error_msg = exc.__cause__.args[0]
                except IndexError:
                    # 如果取不到索引获取引起错误的实例的类的名字
                    error_msg = exc.__cause__.__class__.__name
                if verbose and error_msg:
                    msg = '*** Error for {}: {}'
                    print(msg.format(country_code, error_msg))
                # 其他的错误类型用Enum的最后一种关系表达式
                status = HTTPStatus.error
            else:
                status = res.status
            # 统计各种状态
            counter[status] += 1
    
        return counter
    
    def download_many(cc_list, base_url, verbose, concur_req):
        loop = asyncio.get_event_loop()
        coro = downloader_coro(cc_list, base_url, verbose, concur_req)
        counts = loop.run_until_complete(coro)
        loop.close()
        # print(counts)
        # 返回运行完成后的统计字典
        return counts
    
    
    if __name__ == '__main__':
        main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
    

     上面是按照书上的写法,自己做了一些标识,这里面最终调用的最底层运行异步的函数就是

     
    import asyncio
    import collections
    
    import aiohttp
    from aiohttp import web
    import tqdm
    
    from flags2_common import  main, HTTPStatus, Result, save_flag
    
    # 默认设为较小的值,防止远程网络出错
    DEFAULT_CONCUR_REQ = 5
    MAX_CONCUR_REQ = 1000
    
    
    class FetchError(Exception):
        def __init__(self, country_code):
            # 继承父类的args添加属性,pythoncookbook书中介绍最好这样写
            super(FetchError, self).__init__(country_code)
            self.country_code = country_code
    
    async def http_get(url):
        async with aiohttp.ClientSession() as session:
            # 运行底层库函数session.get(url)
            async with session.get(url) as resp:
                if resp.status == 200:
                    ctype = resp.headers.get('Content-type','').lower()
                    # 如果是json返回JSON数据
                    if 'json' in ctype or url.endswith('json'):
                        data = await resp.json()
                        # print(data)
                    else:
                        data = await resp.read()
                    # print(data)
                    return data
                elif resp.status == 404:
                    return web.HTTPNotFound
                else:
                    raise aiohttp.ClientHttpProxyError(
                        code=resp.status,
                        message=resp.reason,
                        headers=resp.headers
                    )
    
    
    async def get_country(base_url, cc):
        url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
        # print(url) 返回国家的信息
        metadata = await http_get(url)
        return metadata['country']
    
    
    async def get_flag(base_url, cc):
        url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
        # print(url)
        return (await http_get(url))
    
    
    async def down_load_one(cc, base_url, semaphore, verbose):
        # 404与正常下载返回对象关系,其他上浮错误给调用者
        try:
            # 在标记的最大协程数量下工作
            async with semaphore:
                image = await get_flag(base_url, cc)
                # print(image)
            async with semaphore:
                country = await get_country(base_url, cc)
        # 抓取上浮的404的错误
        except web.HTTPNotFound:
            # 用Enum关系表达式保存status
            status = HTTPStatus.not_found
            msg = 'not found'
        # 其它引起的报错为基础错误Exception,上浮给调用者
        except Exception as exc:
            raise FetchError(cc) from exc
        else:
            # 正常情况下,保存图片
            # run_in_executor方法的第一个参数是Executor实例;如果设为None,使用事件循环的默认为ThreadPoolExecutor
            country = country.replace(' ','_')
            filename = '{}-{}.gif'.format(country, cc)
            loop = asyncio.get_event_loop()
            loop.run_in_executor(None,
                                 save_flag, image, filename)
            # save_flag(image, cc.lower() + '.gif')
            status = HTTPStatus.ok
            msg = 'ok'
        if verbose and msg:
            print(cc, msg)
        # 返回namedtuple对象
        return Result(status, cc)
    
    
    async def downloader_coro(cc_list, base_url, verbose, concur_req):
        # 计数器
        counter = collections.Counter()
        # 标记协程最大对象
        semaphore = asyncio.Semaphore(concur_req)
        # 协程列表
        to_do = [
            down_load_one(cc, base_url, semaphore, verbose)
            for cc in cc_list
        ]
        # 完成的工作任务
        to_do_iter = asyncio.as_completed(to_do)
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
        for future in to_do_iter:
            try:
                res = await future
            # 获取调用者的错误信息
            except FetchError as exc:
                country_code = exc.country_code
                try:
                    # 尝试获取引起错误的实例的参数args
                    error_msg = exc.__cause__.args[0]
                except IndexError:
                    # 如果取不到索引获取引起错误的实例的类的名字
                    error_msg = exc.__cause__.__class__.__name
                if verbose and error_msg:
                    msg = '*** Error for {}: {}'
                    print(msg.format(country_code, error_msg))
                # 其他的错误类型用Enum的最后一种关系表达式
                status = HTTPStatus.error
            else:
                status = res.status
            # 统计各种状态
            counter[status] += 1
    
        return counter
    
    def download_many(cc_list, base_url, verbose, concur_req):
        loop = asyncio.get_event_loop()
        coro = downloader_coro(cc_list, base_url, verbose, concur_req)
        counts = loop.run_until_complete(coro)
        loop.close()
        # print(counts)
        # 返回运行完成后的统计字典
        return counts
    
    
    if __name__ == '__main__':
        main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
    
    
    

     从代码中可以看到,在单个下载down_load_one协程中,添加了两个协程任务。

    但最终实现I/O操作的还是

    async with aiohttp.ClientSession() as session:

    # 运行底层库函数session.get(url)

    async with session.get(url) as resp

    18.6使用sayncno包编写服务器。

    这个能力有限,理解起来太累,回头有空看了再写。

    
    
    
  • 相关阅读:
    Docker GitLab镜像部署
    Kubernetes集群部署之三ETCD集群部署
    Kubernetes集群部署之二CA证书制作
    Kubernetes集群部署之一系统环境初始化
    docker开源仓库Harbor部署笔记
    git分支
    git高清技能图
    vue+uwsgi+nginx部署路飞学城
    git基础
    git安装与初始化
  • 原文地址:https://www.cnblogs.com/sidianok/p/12231548.html
Copyright © 2011-2022 走看看