zoukankan      html  css  js  c++  java
  • 从零开始学asyncio(下)

      本篇文章主要是对asyncio和相关内容的补充, 以及一个异步爬虫实例. 这个系列还有另外两篇文章:

    一. 使用同步代码

      上一篇文章已经讲到, 使用asyncio模块的基本套路是, 把要执行的代码写成协程函数的形式, 在函数内部IO操作的部分使用await挂起任务. 最后将协程给asyncio运行即可. 

      假设现在需要使用requests库请求数据:

    import requests
    
    
    def fetch(url):
        response = requests.get(url)
        # 以下省略一万字

    显然, 对一个url的get请求属于耗时的IO操作, 但是requests是个同步库, 没法await, 如果直接以同步的方式运行, 那么又不能发挥出异步的效率优势.

      这种必须使用同步代码的情况下,  可以这样做:

    import asyncio
    import requests
    
    
    async def fetch(url):
        loop = asyncio.get_running_loop()
        asyncIO = loop.run_in_executor(None, requests.get, url)
        response = await asyncIO
        # 继续省略一万字

       loop就是当前线程运行的事件循环, 调用它的run_in_execcutor方法, 可以将一个同步代码的函数封装为future对象, 然后就可以对其await了.

      这个方法接受三个参数: executor, func和*args, 其中executor为线程池或者进程池(concurrent.futures.ThreadPoolExecutor&ProcessPoolExecutor), 如果传入None则使用默认的线程池.  func和*args就是同步函数以及对应的参数. 这个方式的本质是将同步代码放在其它的线程或者进程运行, 来避免其阻塞主线程, 故只是权宜之计.

      目前已经有越来越多支持asyncio的异步库, 比如上面说的requests库, 就可以用aiohttp来替代. 在https://github.com/timofurrer/awesome-asyncio可以找到支持用asyncio异步的库.

    二. async语句的其它用法

      async除了用于定义协程函数外, 另两种用法是async with和async for, 这两种语句和await一样有异步属性, 必须在协程函数内使用.

    1. async with

      async with和with类似, async with的对象应该实现__aenter__和__aexit__方法, 并且这两个方法是异步的.

      一个简单的async with实例如下:

    import asyncio
    
    
    class Lock:
    
        def __init__(self):
            self._locked = False
    
        async def __aenter__(self):
            while self._locked:
                await asyncio.sleep(0)
            self._locked = True
            return self
    
        async def __aexit__(self, exc_type, exc, tb):
            self._locked = False
    
    
    lock = Lock()
    
    
    async def coro():
        async with lock:
            pass
    
    
    asyncio.run(coro())

    这里实现了一个简单的协程锁, 其中__aenter__和__aexit__方法分别用于请求和释放锁.

    2. async for

      async for也和for类似, 后面接的迭代对象需要实现__aiter__和__anext__两个方法, 其中__anext__方法是异步的.

      一个简单的async for实例如下:

    import asyncio
    
    
    class Xrange:
    
        def __init__(self, lower, upper):
            self._num = lower-1
            self.upper = upper
    
        def __aiter__(self):
            return self
    
        async def __anext__(self):
            self._num += 1
            if self._num < self.upper:
                return self._num
            raise StopAsyncIteration
    
    
    async def coro():
        async for i in Xrange(1, 10):
            print(i)
    
    
    asyncio.run(coro())

     需要注意的是, 要在__anext__方法中迭代结束的位置引发StopAsyncIteration, 否则这个对象就会永远迭代下去.

    三. 协程补充

      除了send方法之外, 协程对象常见的方法还有throw和close, 前者在协程内部引发异常, 后者将协程停止.

    1. coro.throw()

      顾名思义, throw方法就是把一个异常扔进协程内部, 它需要三个参数: 
    coro.throw(type, value=None, traceback=None)
    其中type为错误类型, value为错误值, traceback的话, 官网没找到说明, 我也不知道是什么.
      现在测试一下throw方法:
    class Awaitable:
    
        def __await__(self):
            yield
    
    
    async def coro():
        while 1:
            aw = Awaitable()
            await aw
    
    
    c = coro()
    # 如果不预先激活协程就直接调用throw,那么throw方法就会在async def coro这一行引发异常
    c.send(None)
    c.throw(Exception, 'haha')

    运行结果如下:

      如果一个协程没有被激活, 那么调用throw, 就会直接在定义协程的那个位置抛出异常, 否则, 协程卡在哪个yield, 就在那个位置抛出异常. 当然, 协程也可以在内部捕获这个异常.

      除此之外, 其实throw和send是相似的, 二者都会获得yield返回的值. 并且, throw和send一样会驱动协程, 前提是协程能捕获到这个异常而不退出.

    2. coro.close()

      close函数用于关闭协程, 使之完全停止. 这个函数其实是使用了throw函数, 伪码如下:

    def close(self):
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("generator ignored GeneratorExit")

      其中GeneratorExit继承自BaseException, 也是异常的一种. 因此, close方法其实就是通过往协程内部扔异常的方式让协程停止, 如果这个协程执意要捕获异常而不停止的话, close方法就会抛出RuntimeError.

    四. task/future补充

       上一篇文章讲过, future用在协程内部, 主要特性就是暂停和回调:

    • 协程在这个位置暂停和切换, 因此可用于IO操作
    • 在协程被task封装的前提下, 调用set_result就等于结束本次暂停, 并将set_result的值返回给协程
    • 可以设置回调函数, 当暂停结束的时候调用回调

    task继承自future, 性质和future差不多, 另外还负责封装和驱动协程.

    1.task&future的创建与鉴别

      鉴别一个对象是不是task&future, 可以调用asyncio.isfuture, 这个函数会返回一个布尔值. 相应的, 调用asyncio.iscoroutine和asyncio.iscoroutinefunction就可以判断协程对象和协程函数.

      如果要创建task或者future对象, 可以调用以下函数;

    # 要把协程封装为task对象,可以使用以下两个函数:
    
    # 这个函数必须在有事件循环运行时调用,不然报错
    # 一般来说,在协程中调用就行
    asyncio.create_task(coro)
    
    # 这个函数不光可以接收协程对象,只要是可等待对象都可以
    # 如果没有事件循环, 它就会调用get_event_loop获取
    asyncio.ensure_future(aw)
    
    
    # 或者在获取到当前事件循环的前提下
    loop.create_task()
    
    
    # 如果要创建future对象,可以使用以下方法:
    
    asyncio.Future()
    
    # 或者在获取到当前事件循环的前提下
    loop.create_future()

    2.取消task&future

      task&future有pending, 和done两种状态, 其中done又可以再分为finished和cancelled. 简单来说, 还能运行就是pending, 正常结束就是finished. 被取消了就是cancelled, 调用cancel方法就可以取消一个task&future对象.

      现在使用如下代码取消一个future对象:

    import asyncio
    
    
    loop = asyncio.get_event_loop()
    
    
    async def coro():
        fut = loop.create_future()
    
        def cb(fut):
            print('这是一个回调函数')
        fut.add_done_callback(cb)
        # 在一秒之后取消fut
        loop.call_later(1, fut.cancel)
        try:
            await fut
        except Exception as e:
            print('异常:', type(e))
        print('future对象:', fut)
    
    
    loop.run_until_complete(coro())

    运行结果如下:

      首先, cancel方法会处理所有的回调函数, 这一点和set_result方法是一样的. 在这之后, future.__await__方法内部会引发CancelledError, 如果协程不捕获的话, 这整个协程就停止了. 最后可以看到, fut对象的状态变为cancelled. 简单点说, 就是首先处理回调函数, 然后往协程里面throw一个CancelledError使之结束.

      task的cancel方法与future类似, 首先处理回调函数, 然后调用future.cancel来取消协程.

    五. 事件循环

      上一篇中讲过了, 事件循环相当于一个调度者, 对多个task进行管理, 当触发到事件时, 就驱动对应的task运行.

    1. 获取和设置事件循环

      使用以下函数获取或者设置事件循环:

    # 如果没有正在运行的事件循环, 下面这个函数会报错
    # 因此, 只能在协程或者相关的回调中调用这个函数
    asyncio.get_running_loop()
    # 获取事件循环, 没有就会新建一个
    asyncio.get_event_loop()
    # 新建一个事件循环
    asyncio.new_event_loop()
    # 将loop绑定到当前的线程中
    asyncio.set_event_loop(loop)

    事件循环对象绑定了很多方法, 主要是运行回调, 网络通信等方面的, 可以看https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html.

    2. 令人迷惑的点

      不论是future还是task, 都需要依赖事件循环来调度. 显然, 在一个线程只需要一个事件循环, 否则会造成程序混乱.

      现在运行下面这段程序:

    import asyncio
    
    
    loop = asyncio.get_event_loop()
    
    
    async def coro():
        fut = loop.create_future()
        loop.call_later(1, fut.set_result, None)
        await fut
        print('the end')
    
    
    asyncio.run(coro())

    运行结果如下:

      这是因为, asyncio.run会创建一个事件循环A来运行传入的协程对象, 但是, 这个协程自身又通过事件循环B来创建了一个future对象, 那么这个future对象就绑定到事件循环B上去了, 这就导致程序的混乱和错误.

      如果要避免这种情况, 可以采取以下两种措施;

    • 首先, 尽量使用asyncio.get_running_loop而非asyncio.get_event_loop, 这样能确保获取的事件循环就是当前运行的那个
    • 然后, 如果调用的API可以接收loop参数, 就传入loop参数以保证事件循环的一致性. 我统计了一下, asyncio模块的task.py共定义了42个函数, 其中有14个函数能接收loop参数.

    六. asyncio.wait&asyncio.gather

      之前讲过, 如果要同时运行多个协程, 可以使用asyncio.wait将其打包, 其实asyncio中还有一个效果类似的函数, 即asyncio.gather.

    1. wait&gather的实现原理

      gather和wait函数的主要代码如下(有删改):
    import asyncio


    def gather(*coros): def _done_callback(fut): nonlocal nfinished nfinished += 1 if nfinished == nfuts: results = [] for fut in children: res = fut.result() results.append(res) outer.set_result(results) nfuts = 0 nfinished = 0 children = [] for c in coros: fut = asyncio.create_task(c) fut.add_done_callback(_done_callback) children.append(fut) nfuts = len(children) outer = _GatherFuture(children) return outer class _GatherFuture(asyncio.Future): ''' 这个future类是gather函数的辅助 如果调用它的cancel方法, 就会把所有的子协程都取消 ''' def __init__(self, children): super().__init__() self._children = children def cancel(self): if self.done(): return False ret = False for child in self._children: if child.cancel(): ret = True return ret async def wait(fs): fs = {asyncio.create_task(f) for f in set(fs)} loop = asyncio.get_running_loop() waiter = loop.create_future() counter = len(fs) def _on_completion(f): nonlocal counter counter -= 1 if counter <= 0: waiter.set_result(None) for f in fs: f.add_done_callback(_on_completion) await waiter done, pending = set(), set() for f in fs: f.remove_done_callback(_on_completion) if f.done(): done.add(f) else: pending.add(f) return done, pending
      gather和wait的实现原理是大同小异的,都利用了future和task的特性. future可以视为一个暂停点, 调用future.set_result, 就等于这次暂停结束了, task则等于对协程的封装, 不但可以驱动协程, 而且在协程结束的时候会调用设定的回调函数. 
      基于future和task的特性, 首先将协程都封装成task, 同时用一个变量统计协程数,并且实例化一个future对象, 让程序停在future这里. 每个协程一结束,就调用回调更改计数器的值,根据计数器的值确定是不是所有协程都结束了, 是的话就调用future对象的set_result结束本次暂停. 这样, 等所有协程都结束之后, wait&gather函数也就结束了.

    2. wait&gather的使用差异

      首先, 二者都是接收协程, 实现这些协程并发的效果, 其中wait方法是将协程放在一个可迭代对象中,然后作为一个参数传入, 而gather则是使用任意参数列表, 即可以传入多个协程:
    # coros是一个包含了多个协程的可迭代对象, 比如一个协程列表
    asyncio.wait(coros)
    # 如果使用gather,应该用以下两种方式传参
    asyncio.gather(*coros)
    asyncio.gather(coro1,coro2,coro3)

      然后, wait是一个协程函数, 因此直接调用会返回一个协程对象, 可以使用asyncio.run运行. 但是gather则是返回一个future对象, 因此与asyncio.run不兼容:

    # run收到的参数是一个协程,没毛病
    asyncio.run(asyncio.wait(coros))
    # run收到一个future对象,因此下面这句话会报错
    asyncio.run(asyncio.gather(*coros))
    # 如果要运行gather打包的协程, 可以用以下两种方式:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*coros))
    # 或者
    async def main():
        res = await asyncio.gather(*coros)
    
    asyncio.run(main())

      另一方面, 由于gather函数返回的是一个future对象, 因此可以对其调用cancel方法来提前结束, 此时gather内所有的协程都会被取消.

      最后, 由上面的代码可知, 二者的完成时的返回值有差异, wait会返回done和pending两个集合, 即已完成和未完成的协程集合, 而gather则是直接把协程的结果放在列表中返回.

    async def func(coros):
        # wait会返回两个集合, 分别存放coros中已完成和未完成的协程
        done_coros, pending_coros = await asyncio.wait(coros)
        # gather则是直接把协程的结果放在一个列表中返回
        results = await asyncio.gather(*coros)

    3. wait&gather函数的附加功能

      wait和gather函数的附加功能主要体现在其附加的参数上.

      wait可以设置timeout和return_when两个参数, 从而让wait函数提前结束, 这时未完成的协程就会被放入pending这个集合中返回. 不过, wait并不会将这些未完成的协程取消掉.

    import asyncio
    import concurrent.futures
    
    
    # return_when只能设置以下三个值, 不然会报错
    concurrent.futures.FIRST_COMPLETED # 在有一个协程结束时返回
    concurrent.futures.FIRST_EXCEPTION # 在有一个协程抛出异常时返回
    concurrent.futures.ALL_COMPLETE # 在所有协程都结束时返回, 这是默认值
    
    
    async def main(coros):
        # timeout表示几秒后返回, 不设置则不起作用
        done, pending = await asyncio.wait(coros, timeout=3, return_when=concurrent.futures.FIRST_COMPLETED)

      需要注意的是, 就算将return_when设置为FIRST_COMPLETED,  如果多个协程的完成时间相近, 那么这几个协程可能都会被放到done集合中. FIRST_EXCEPTION同理.

      gather可以设置return_exceptions这个参数, 其默认值为False, 如果设置为True, 那么协程在运行中的错误将不会直接抛出, 而是将错误信息作为返回结果, 存入最后返回的结果列表中.

    按照python官方文档的说法, 从python3.8开始, wait函数不会将传入的协程封装为task对象, 也就是说, 在将协程传入wait函数之前, 应该先调用asyncio.create_task方法将协程转化为task对象. 详细说明可以看https://docs.python.org/zh-cn/3/library/asyncio-task.html?highlight=asyncio%20wait#asyncio.wait.
    但是, 我今天特地安装了一个python3.8进行测试, 发现并没有这回事, 传一个协程列表给wait, 程序还是可以正常运行. 而且wait内部的代码也没改, 还是会调用ensure_future将协程封装成task对象.
    总之, 这里还是注意一下吧, 指不定哪天就变了.
    今天是2020-1-7.
    补充说明

    七. 一个简单的爬虫实例

      以https://xkcd.com/这个网站为例, 这是一个漫画网站, 每张漫画对应的页面是https://xkcd.com/int/, 其中int为1-2244之间的整数. 现在从网站上下载一百张漫画, 代码如下:

    import asyncio
    import os
    import random
    import re
    import time
    
    import aiofiles
    import aiohttp
    
    
    class Producer:
    
        def __init__(self, session, queue):
            self.session = session
            self.regex = re.compile(
                '''<div id="comic">
    <img src="(//imgs.xkcd.com/comics/[a-z0-9_]+.(jpg|png))"''')
            self.q = queue
            self.urls = self.url_factory()
    
        async def start(self):
            workers = [self.worker() for i in range(10)]
            await asyncio.wait(workers)
            self.q.put_nowait('all tasks are done')
    
        async def worker(self):
            while self.urls:
                url = self.urls.pop()
                html = await self.fetch(url)
                await self.pares_html(html)
    
        def url_factory(self, n=100):
            # 返回一个含有n个url的集合
            urls = set()
            while len(urls) < n:
                urls.add('https://xkcd.com/{}/'.format(random.randrange(1, 2245)))
            return urls
    
        async def fetch(self, url):
            async with self.session.get(url) as response:
                assert response.status == 200
                return await response.text()
    
        async def pares_html(self, html):
            res = re.search(self.regex, html)
            if res is None:
                return
            img_url = 'https:'+res.group(1)
            await self.q.put(img_url)
    
    
    class Consumer:
    
        def __init__(self, session, queue):
            self.session = session
            self.q = queue
            self.save_folder = os.path.join(os.path.dirname(__file__), 'webcomics')
            try:
                os.mkdir(self.save_folder)
            except FileExistsError:
                pass
    
        async def start(self):
            workers = [self.worker() for i in range(10)]
            await asyncio.wait(workers)
    
        async def worker(self):
            while 1:
                url = await self.q.get()
                if url == 'all tasks are done':
                    break
                await self.download(url)
            self._on_worker_done()
    
        def _on_worker_done(self):
            self.q.put_nowait('all tasks are done')
    
        async def download(self, url):
            filename = os.path.split(url)[-1]
            save_path = os.path.join(self.save_folder, filename)
            if os.path.exists(save_path):
                return
            async with self.session.get(url) as response:
                assert response.status == 200
                content = await response.read()
                async with aiofiles.open(save_path, 'wb') as f:
                    await f.write(content)
    
    
    async def main():
        queue = asyncio.Queue(maxsize=100)
        async with aiohttp.ClientSession() as session:
            producer = Producer(session, queue)
            consumer = Consumer(session, queue)
            tasks = asyncio.gather(producer.start(), consumer.start())
            await tasks
    
    
    if __name__ == '__main__':
        asyncio.run(main())

      这段代码主要分为两部分: producer和consumer, 即生产者和消费者. 生产者负责生成图片下载链接, 消费者则处理这些链接, 将对应的图片下载到本地.

    1. producer代码解析

      首先, producer需要两个参数, session和queue. session是连接池, 其特点是结束之后, 对应的连接不会断开, 下次需要同一个目标的连接时, 直接使用上次未断开的连接, 也就是实现了tcp连接的重复使用. 由于本次连接的目标都是同一个网站, 因此使用连接池可以减少创建和断开tcp连接的开销(三次握手, 四次挥手). queue则是队列, 生产者产生图片下载链接之后, 不直接交给消费者, 而是放在队列中让消费者自取. 这样二者不是同步的, 效率低的一方不会阻塞到效率高的另一方. 

      producer的工作策略是, 首先产生好100个网页链接存在self.urls这个列表中, 然后让worker不断从集合中取出url. worker方法首先调用fetch方法获取到对应的html, 然后调用parse_html提取html中的图片链接, 将图片链接储存到队列中. 

      这段代码并不算复杂, 需要注意的有三点.

    • 首先, producer调用start方法开始工作, 这个方法创建了10个worker协程, 相当于最大并发数为10. 之所以只创建10个协程, 一方面是因为爬取的数量不多, 没必要爬太狠, 一方面是创建过多的tcp连接, 也会造成较大的开销(虽然连接池有tcp复用机制, 但是多个协程同时在爬, 每个协程都需要一个tcp连接). 如果要限制协程的并发量, 除了限制创建协程的数量之外, 还可以使用信号量的方式, 这部分可以在asyncio的高级api中找到, 用法比较简单. 
    • 然后, 这里worker从self.urls中取值的方式是直接取, 这里不需要担心线程安全问题, 因为协程只会在指定的位置, 比如await时切换到其它协程, 也就是说, 从self.urls中取值这一操作是原子性的.

    2. consumer代码解析

      consumer类的代码与producer相似, 本身也不算复杂, 主要变化就是把fetch换成download, 将请求的数据直接写入本地的图片文件. 值得讲讲的就是这里停止消费者的机制:

    • 由于本次只爬取100张图片, 所以爬取结束之后, 消费者应该停止. 这里停止消费者的机制是, 生产者结束后, 往队列中放 "all tasks are done" 这一语句来通知消费者, 消费者的某个worker收到这一消息后, 就停止, 并将这一消息再放入队列来通知其它worker, 从而让所有worker都停止. 

      如果要停止消费者, 另外一个方式是使用asyncio.Queue自带的join方法. 首先, 将 "all tasks are done" 这一机制删除, 然后, 将start和worker方法的代码修改如下:

    async def start(self):
        workers = [asyncio.Task(self.worker()) for i in range(10)]
        await self.q.join()
        for worker in workers:
            worker.cancel()
    
    async def worker(self):
        while 1:
            url = await self.q.get()
            await self.download(url)
            self.q.task_done()

      asyncio.Queue内部带有一个future对象和一个_unfinished_tasks计数, 当往队列添加数据时, _unfinished_tasks+1, 调用task_done方法时, _unfinished_tasks-1, 并且如果此时_unfinished_task为0, 就调用future对象的set_result方法.

      这种停止机制的关键就在于队列: 调用队列的join方法会返回队列的future对象, 因此, await self.q.join()这句话实际是在等待future对象结束. start方法首先用asyncio.Task对worker进行封装, 使其开始运行. worker每从队列中取出一个url并下载图片, 就调用一次task_done. 这样队列为空时, 队列的future对象就结束了, 于是start协程再反过来将worker协程取消.

      这种方法的问题是, 首先, 这里的生产者和消费者是同时运行的, 当生产者没有产生数据之前, 队列为空, 此时消费者就会直接结束. 然后, 如果生产者效率比消费者低很多, 在生产中途出现了队列为空的情况, 此时消费者也可能提前结束, 因此这种方法更适用于消费者消费队列中已有数据的场景, 在这里不适用.

    在使用爬虫获取网站内容之前, 应该先查看这个网站的robots协议, 该协议放在root_url/robots.txt, 比如https://xkcd.com/的robots协议可以在https://xkcd.com/robots.txt看到. 这个协议规定了允许的爬虫和可以爬取的目录.
    比如:
    User-agent: *
    Disallow: /personal/
    表示任何爬虫都可以爬, 但是不能爬取/personal/目录下的内容.
    补充说明
  • 相关阅读:
    加入创业公司有什么利弊
    Find Minimum in Rotated Sorted Array II
    Search in Rotated Sorted Array II
    Search in Rotated Sorted Array
    Find Minimum in Rotated Sorted Array
    Remove Duplicates from Sorted Array
    Spiral Matrix
    Spiral Matrix II
    Symmetric Tree
    Rotate Image
  • 原文地址:https://www.cnblogs.com/q1214367903/p/12091714.html
Copyright © 2011-2022 走看看