zoukankan      html  css  js  c++  java
  • python开发者的AsyncIO

    Threads, loops, coroutines and futures

    线程是一种常用工具,大多数开发人员以前都听过并使用过。然而,asyncio使用完全不同的结构:事件循环协程和future

    • 一个事件循环基本上是管理和分配的不同任务的执行。它注册它们并处理它们之间的控制流分配。
    • 协同程序是与Python生成器类似的特殊函数,等待它们将控制流释放回事件循环。A协程需要调度到事件循环运行,once scheduled coroutines are wrapped in Taskwhich is a type of Future
    • future是表示可能或可能未执行的任务结果的对象。这个结果可能是个exception。

    很简单吧?让我们继续!

     

    同步和异步执行

    Concurrency中并不是并行将任务分解为并发子任务只允许并行,这是创建它的这些子任务的调度。

    Asyncio正是如此,您可以构建代码,以便将子任务定义为协同程序,并允许您根据需要安排它们,包括同时。协同程序包含屈服点,我们定义可能的点,如果其他任务处于挂起状态,则可能发生上下文切换,但如果没有其他任务挂起则不会。

    asyncio中的上下文切换表示事件循环,产生从一个协程到下一个协同程序的控制流。我们来看一个非常基本的例子:

     1 import asyncio
     2 
     3 
     4 async def foo():
     5     print('Running in foo')
     6     await asyncio.sleep(0)
     7     print('Explicit context switch to foo again')
     8 
     9 
    10 async def bar():
    11     print('Explicit context to bar')
    12     await asyncio.sleep(0)
    13     print('Implicit context switch back to bar')
    14 
    15 
    16 async def main():
    17     tasks = [foo(), bar()]
    18     await asyncio.gather(*tasks)
    19 
    20 
    21 asyncio.get_event_loop().run_until_complete(main())


    Explicit context to bar Running in foo Implicit context switch back to bar Explicit context switch to foo again
    • 首先,我们声明了几个简单的协同程序,这些协同程序假装使用asyncio中sleep函数进行非阻塞工作
    • 然后我们创建一个入口点协同程序,我们使用gather 合并先前的协程,以等待它们完成gather的内容比这更多,但我们暂时忽略它。
    • 最后我们使用asyncio.run_until_complete安排我们的入口点协程,它将负责创建一个事件循环并安排我们的入口点协同程序。 

    通过在另一个协程上使用await,我们声明协程可以将控制权交给事件循环,在本例中为sleep协程将yield,事件循环将上下文切换到计划执行的下一个任务:bar类似地,bar协程使用await sleep,它允许事件循环在之前产生的点处将控制权传递给foo,就像普通的Python生成器一样。 

    现在让我们模拟两个阻塞任务,gr1gr2,说它们是对外部服务的两个请求。当执行第三个任务时,可以异步执行任务,如下例所示:

     1 import time
     2 import asyncio
     3 
     4 start = time.time()
     5 
     6 
     7 def tic():
     8     return 'at %1.1f seconds' % (time.time() - start)
     9 
    10 
    11 async def gr1():
    12     # Busy waits for a second, but we don't want to stick around...
    13     print('gr1 started work: {}'.format(tic()))
    14     await asyncio.sleep(2)
    15     print('gr1 ended work: {}'.format(tic()))
    16 
    17 
    18 async def gr2():
    19     # Busy waits for a second, but we don't want to stick around...
    20     print('gr2 started work: {}'.format(tic()))
    21     await asyncio.sleep(2)
    22     print('gr2 Ended work: {}'.format(tic()))
    23 
    24 
    25 async def gr3():
    26     print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
    27     await asyncio.sleep(1)
    28     print("Done!")
    29 
    30 
    31 async def main():
    32     tasks = [gr1(), gr2(), gr3()]
    33     await asyncio.gather(*tasks)
    34 
    35 
    36 asyncio.get_event_loop().run_until_complete(main())
    37 
    38 gr2 started work: at 0.0 seconds
    39 Let's do some stuff while the coroutines are blocked, at 0.0 seconds
    40 gr1 started work: at 0.0 seconds
    41 Done!
    42 gr2 Ended work: at 2.0 seconds
    43 gr1 ended work: at 2.0 seconds
    44 
    45 Process finished with exit code 0

    注意事件循环如何管理和调度执行,允许我们的单线程代码同时运行。当两个阻塞任务被阻止时,第三个阻塞任务可以控制流程

    执行顺序

    在同步世界中,我们习惯于线性思考。如果我们要完成一系列占用不同时间的任务,它们将按照它们被调用的顺序执行。

    但是,在使用并发时,我们需要知道任务的完成顺序与计划顺序不同。

     1 import random
     2 from time import sleep
     3 import asyncio
     4 
     5 
     6 def task(pid):
     7     """Synchronous non-deterministic task."""
     8     sleep(random.randint(0, 2) * 0.001)
     9     print('Task %s done' % pid)
    10 
    11 
    12 async def task_coro(pid):
    13     """Coroutine non-deterministic task"""
    14     await asyncio.sleep(random.randint(0, 2) * 0.001)
    15     print('Task %s done' % pid)
    16 
    17 
    18 def synchronous():
    19     for i in range(1, 10):
    20         task(i)
    21 
    22 
    23 async def asynchronous():
    24     tasks = [task_coro(i) for i in range(1, 10)]
    25     await asyncio.gather(*tasks)
    26 
    27 
    28 print('Synchronous:')
    29 synchronous()
    30 
    31 print('Asynchronous:')
    32 
    33 asyncio.get_event_loop().run_until_complete(asynchronous())
    34 
    35 
    36 Synchronous:
    37 Task 1 done
    38 Task 2 done
    39 Task 3 done
    40 Task 4 done
    41 Task 5 done
    42 Task 6 done
    43 Task 7 done
    44 Task 8 done
    45 Task 9 done
    46 Asynchronous:
    47 Task 5 done
    48 Task 2 done
    49 Task 3 done
    50 Task 4 done
    51 Task 1 done
    52 Task 8 done
    53 Task 9 done
    54 Task 6 done
    55 Task 7 done

    当然,您的输出会有所不同,因为每个任务都会随机休眠一段时间,但请注意结果顺序是如何完全不同的,即使我们使用范围以相同的顺序构建任务数组重要的是要理解asyncio不会神奇地使事情变得非阻塞。在编写asyncio时,标准库中独立存在,其余模块仅提供阻塞功能。您可以使用concurrent.futures模块将阻塞任务包装在线程或进程中,并返回Asyncio可以使用的Future 

     

    这可能是使用asyncio时的主要缺点,但是有很多库用于不同的任务和服务

    当然,一个非常常见的阻塞任务是从HTTP服务获取数据。我正在使用优秀的aiohttp库来处理从Github的公共事件API中检索数据的非阻塞HTTP请求,并简单地采用Date响应头。

    请不要关注aiohttp_get下面协程的细节它们使用异步上下文管理器语法,这超出了本文的范围,但是使用aiohttp执行异步HTTP请求是必要的方式只是假装是一个外部协程,并专注于它如何在下面使用。

     1 import time
     2 import urllib.request
     3 import asyncio
     4 import aiohttp
     5 
     6 URL = 'https://api.github.com/events'
     7 MAX_CLIENTS = 3
     8 
     9 
    10 def fetch_sync(pid):
    11     print('Fetch sync process {} started'.format(pid))
    12     start = time.time()
    13     response = urllib.request.urlopen(URL)
    14     datetime = response.getheader('Date')
    15 
    16     print('Process {}: {}, took: {:.2f} seconds'.format(
    17         pid, datetime, time.time() - start))
    18 
    19     return datetime
    20 
    21 
    22 async def aiohttp_get(url):
    23     """Nothing to see here, carry on ..."""
    24     async with aiohttp.ClientSession() as session:
    25         async with session.get(url) as response:
    26             return response
    27 
    28 
    29 async def fetch_async(pid):
    30     print('Fetch async process {} started'.format(pid))
    31     start = time.time()
    32     response = await aiohttp_get(URL)
    33     datetime = response.headers.get('Date')
    34 
    35     print('Process {}: {}, took: {:.2f} seconds'.format(
    36         pid, datetime, time.time() - start))
    37 
    38     response.close()
    39     return datetime
    40 
    41 
    42 def synchronous():
    43     start = time.time()
    44     for i in range(1, MAX_CLIENTS + 1):
    45         fetch_sync(i)
    46     print("Process took: {:.2f} seconds".format(time.time() - start))
    47 
    48 
    49 async def asynchronous():
    50     start = time.time()
    51     tasks = [asyncio.ensure_future(
    52         fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
    53     await asyncio.wait(tasks)
    54     print("Process took: {:.2f} seconds".format(time.time() - start))
    55 
    56 
    57 print('Synchronous:')
    58 synchronous()
    59 
    60 print('Asynchronous:')
    61 
    62 asyncio.get_event_loop().run_until_complete(asynchronous())
    63 
    64 Synchronous:
    65 Fetch sync process 1 started
    66 Process 1: Fri, 07 Dec 2018 07:02:06 GMT, took: 1.50 seconds
    67 Fetch sync process 2 started
    68 Process 2: Fri, 07 Dec 2018 07:02:07 GMT, took: 1.09 seconds
    69 Fetch sync process 3 started
    70 Process 3: Fri, 07 Dec 2018 07:02:08 GMT, took: 1.18 seconds
    71 Process took: 3.76 seconds
    72 Asynchronous:
    73 Fetch async process 1 started
    74 Fetch async process 2 started
    75 Fetch async process 3 started
    76 Process 3: Fri, 07 Dec 2018 07:02:10 GMT, took: 1.22 seconds
    77 Process 1: Fri, 07 Dec 2018 07:02:10 GMT, took: 1.22 seconds
    78 Process 2: Fri, 07 Dec 2018 07:02:10 GMT, took: 1.28 seconds
    79 Process took: 1.28 seconds
    80 
    81 Process finished with exit code 0

    首先,注意时间差异,通过使用异步调用我们同时对服务的所有请求进行。如上所述,每个请求都会产生到下一个控制流的控制流,并在完成时返回。结果是请求和检索所有请求的结果只需要最慢的请求!查看最慢请求的时间如何记录1.28秒,这是处理所有请求所用的总时间。很酷,对吧?

    其次,看看代码与同步版本的相似程度!它基本上是一样的!主要区别在于执行GET请求和创建任务并等待它们完成的库实现。

    创建并发

    到目前为止,我们一直在使用一种方法来创建和检索协同程序的结果,创建一组任务并等待所有任务完成。

    但协同程序可以安排以不同的方式运行或检索其结果。想象一下,我们需要在它们到达时立即处理HTTP GET请求的结果,这个过程实际上与前面的例子非常相似

    注意填充和每个结果调用的时间,它们是同时执行调用的,结果是无序的,我们会尽快处理它们。

    这种情况下的代码只是略有不同,我们将协同程序收集到一个列表中,每个都准备好进行调度和执行。as_completed 函数返回一个迭代,因为他们进来,将产生一个完整的future。现在不要告诉我,这不是很酷。顺便说一句,as_completed最初来自concurrent.futures 模块。

    让我们再举一个例子,假设您正在尝试获取您的IP地址。您可以使用类似的服务来检索它,但您不确定它们是否可以在运行时访问。你不想按顺序检查每一个。你会向每个服务发送并发请求并选择第一个响应的服务,对吧?对!

    好吧,还有一种方法可以在asyncio中调度任务wait碰巧有一个参数可以做到这一点:return_when但现在我们想从协程中检索结果,因此我们可以使用两组futures, done and pending.

    在下一个例子中,我们将使用pre Python 3.7在asyncio中启动的方式来说明一点,请耐心等待:

     1 from collections import namedtuple
     2 import time
     3 import asyncio
     4 from concurrent.futures import FIRST_COMPLETED
     5 import aiohttp
     6 
     7 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
     8 
     9 SERVICES = (
    10     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    11     Service('ip-api', 'http://ip-api.com/json', 'query')
    12 )
    13 
    14 
    15 async def aiohttp_get_json(url):
    16     async with aiohttp.ClientSession() as session:
    17         async with session.get(url) as response:
    18             return await response.json()
    19 
    20 
    21 async def fetch_ip(service):
    22     start = time.time()
    23     print('Fetching IP from {}'.format(service.name))
    24 
    25     json_response = await aiohttp_get_json(service.url)
    26     ip = json_response[service.ip_attr]
    27 
    28     return '{} finished with result: {}, took: {:.2f} seconds'.format(
    29         service.name, ip, time.time() - start)
    30 
    31 
    32 async def main():
    33     futures = [fetch_ip(service) for service in SERVICES]
    34     done, pending = await asyncio.wait(
    35         futures, return_when=FIRST_COMPLETED)
    36 
    37     print(done.pop().result())
    38 
    39 
    40 ioloop = asyncio.get_event_loop()
    41 ioloop.run_until_complete(main())
    42 ioloop.close()
    43 
    44 Fetching IP from ipify
    45 Fetching IP from ip-api
    46 ip-api finished with result: 58.135.78.16, took: 0.44 seconds
    47 Task was destroyed but it is pending!
    48 task: <Task pending coro=<fetch_ip() running at /usr/local/envs/async/201812/04.py:29> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(6)(), <TaskWakeupMethWrapper object at 0x7fbb9cfe8c48>()]>>
    49 
    50 Process finished with exit code 0

    等等,那里发生了什么?第一个服务响应得很好,但所有这些警告是什么?

    好吧,我们安排了两个任务,但是一旦第一个任务完成关闭循环,第二个任务挂起。Asyncio认为这是一个错误并打印出警告。我们真的应该自己清理一下,让事件循环知道不要打扰待定的futures

    Future states

    (正如在Future可以进入的状态,而不是Future的状态......你知道我的意思)

    这些是:

    • Pending
    • Running
    • Done
    • Cancelled

    就如此容易。当一个未来完成时,它的结果方法将返回未来的结果,如果它正在挂起或正在运行它会引发InvalidStateError,如果它被取消它将引发CancelledError,最后如果协同程序引发异常它将再次引发,这是与调用异常相同的行为但是不要相信我的话

    如果Future处于该状态,您还可以在Future上调用done cancelledrunnin g来获取布尔值,请注意,done表示结果将返回或引发异常。您可以通过调用cancel方法(说来也奇怪)来具体取消Future ,这正是asyncio.runPython 3.7中引人注目的内容,因此您不必担心它

     1 from collections import namedtuple
     2 import time
     3 import asyncio
     4 import aiohttp
     5 
     6 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
     7 
     8 SERVICES = (
     9     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    10     Service('ip-api', 'http://ip-api.com/json', 'query'),
    11     Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
    12 )
    13 
    14 async def aiohttp_get_json(url):
    15     async with aiohttp.ClientSession() as session:
    16         async with session.get(url) as response:
    17             return await response.json()
    18 
    19 
    20 async def fetch_ip(service):
    21     start = time.time()
    22     print('Fetching IP from {}'.format(service.name))
    23 
    24     try:
    25         json_response = await aiohttp_get_json(service.url)
    26     except:
    27         return '{} is unresponsive'.format(service.name)
    28 
    29     ip = json_response[service.ip_attr]
    30 
    31     return '{} finished with result: {}, took: {:.2f} seconds'.format(
    32         service.name, ip, time.time() - start)
    33 
    34 
    35 async def main():
    36     futures = [fetch_ip(service) for service in SERVICES]
    37     done, _ = await asyncio.wait(futures)
    38 
    39     for future in done:
    40         print(future.result())
    41 
    42 
    43 asyncio.run(main())
    44 
    45 Fetching IP from ip-api
    46 Fetching IP from ipify
    47 Fetching IP from borken
    48 ipify finished with result: 81.106.46.223, took: 5.35 seconds
    49 borken is unresponsive
    50 ip-api finished with result: 81.106.46.223, took: 4.91 seconds

    同样的,这是Python3.7之前版本的写法

     1 from collections import namedtuple
     2 import time
     3 import asyncio
     4 from concurrent.futures import FIRST_COMPLETED
     5 import aiohttp
     6 
     7 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
     8 
     9 SERVICES = (
    10     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    11     Service('ip-api', 'http://ip-api.com/json', 'query')
    12 )
    13 
    14 
    15 async def aiohttp_get_json(url):
    16     async with aiohttp.ClientSession() as session:
    17         async with session.get(url) as response:
    18             return await response.json()
    19 
    20 
    21 async def fetch_ip(service):
    22     start = time.time()
    23     print('Fetching IP from {}'.format(service.name))
    24 
    25     json_response = await aiohttp_get_json(service.url)
    26     ip = json_response[service.ip_attr]
    27 
    28     return '{} finished with result: {}, took: {:.2f} seconds'.format(
    29         service.name, ip, time.time() - start)
    30 
    31 
    32 async def main():
    33     futures = [fetch_ip(service) for service in SERVICES]
    34     done, pending = await asyncio.wait(
    35         futures, return_when=FIRST_COMPLETED)
    36 
    37     print(done.pop().result())
    38     for task in pending:
    39         task.cancel()
    40 
    41 loop = asyncio.get_event_loop()
    42 loop.run_until_complete(main())
    43 loop.close
    45 
    46 
    47 
    48 Fetching IP from ipify
    49 Fetching IP from ip-api
    50 ipify finished with result: 58.135.78.16, took: 1.20 seconds
    51 
    52 Process finished with exit code 0

    这种类型的“Task is destroyed but is was pending”错误在使用asyncio时非常常见,现在你知道它背后的原因以及如何避免它

    Futures还允许在回到完成状态时附加回调,以防您想要添加额外的逻辑。您甚至可以手动设置Future的结果或exception ,通常用于单元测试。

     

    Exception handling

    Asyncio的全部内容是使并发代码易于管理和可读,并且在处理异常时变得非常明显。让我们回到一个例子来说明这一点。

    想象一下,我们希望确保我们所有的IP服务都返回相同的结果,但我们的一项服务是脱机的而不是解决方案。我们可以简单地使用try ...except 

    像这样:

    from collections import namedtuple
    import time
    import asyncio
    import aiohttp
    
    Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
    
    SERVICES = (
        Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
        Service('ip-api', 'http://ip-api.com/json', 'query'),
        Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
    )
    
    async def aiohttp_get_json(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()
    
    
    async def fetch_ip(service):
        start = time.time()
        print('Fetching IP from {}'.format(service.name))
    
        try:
            json_response = await aiohttp_get_json(service.url)
        except:
            return '{} is unresponsive'.format(service.name)
    
        ip = json_response[service.ip_attr]
    
        return '{} finished with result: {}, took: {:.2f} seconds'.format(
            service.name, ip, time.time() - start)
    
    
    async def main():
        futures = [fetch_ip(service) for service in SERVICES]
        done, _ = await asyncio.wait(futures)
    
        for future in done:
            print(future.result())
    
    
    asyncio.get_event_loop().run_until_complete(main())
    
    
    Connected to pydev debugger (build 183.4284.139)
    Fetching IP from borken
    Fetching IP from ip-api
    Fetching IP from ipify
    borken is unresponsive
    ipify finished with result: 58.135.78.16, took: 34.84 seconds
    ip-api finished with result: 58.135.78.16, took: 12.60 seconds
    
    Process finished with exit code 0

    我们还可以在处理Futures结果时处理异常,以防发生意外异常:

     1 from collections import namedtuple
     2 import time
     3 import asyncio
     4 import aiohttp
     5 import traceback
     6 
     7 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
     8 
     9 SERVICES = (
    10     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    11     Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
    12     Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
    13 )
    14 
    15 async def aiohttp_get_json(url):
    16     async with aiohttp.ClientSession() as session:
    17         async with session.get(url) as response:
    18             return await response.json()
    19 
    20 
    21 async def fetch_ip(service):
    22     start = time.time()
    23     print('Fetching IP from {}'.format(service.name))
    24 
    25     try:
    26         json_response = await aiohttp_get_json(service.url)
    27     except:
    28         return '{} is unresponsive'.format(service.name)
    29 
    30     ip = json_response[service.ip_attr]
    31 
    32     return '{} finished with result: {}, took: {:.2f} seconds'.format(
    33         service.name, ip, time.time() - start)
    34 
    35 
    36 async def main():
    37     futures = [fetch_ip(service) for service in SERVICES]
    38     done, _ = await asyncio.wait(futures)
    39 
    40     for future in done:
    41         try:
    42             print(future.result())
    43         except:
    44             print("Unexpected error: {}".format(traceback.format_exc()))
    45 
    46 
    47 asyncio.get_event_loop().run_until_complete(main())
    48 
    49 
    50 Fetching IP from ip-api
    51 Fetching IP from ipify
    52 Fetching IP from borken
    53 ipify finished with result: 58.135.78.16, took: 1.15 seconds
    54 Unexpected error: Traceback (most recent call last):
    55   File "/usr/local/envs/async/201812/异常处理在处理Futures时.py", line 46, in main
    56     print(future.result())
    57   File "/usr/local/envs/async/201812/异常处理在处理Futures时.py", line 34, in fetch_ip
    58     ip = json_response[service.ip_attr]
    59 KeyError: 'this-is-not-an-attr'
    60 
    61 borken is unresponsive
    62 
    63 Process finished with exit code 0

    同样地,调度一个任务而不等待它完成被认为是一个bug,调度一个任务而不检索可能引发的异常也会抛出一个警告:

     1 from collections import namedtuple
     2 import time
     3 import asyncio
     4 import aiohttp
     5 
     6 Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
     7 
     8 SERVICES = (
     9     Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    10     Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
    11     Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
    12 )
    13 
    14 async def aiohttp_get_json(url):
    15     async with aiohttp.ClientSession() as session:
    16         async with session.get(url) as response:
    17             return await response.json()
    18 
    19 
    20 async def fetch_ip(service):
    21     start = time.time()
    22     print('Fetching IP from {}'.format(service.name))
    23 
    24     try:
    25         json_response = await aiohttp_get_json(service.url)
    26     except:
    27         print('{} is unresponsive'.format(service.name))
    28     else:
    29         ip = json_response[service.ip_attr]
    30 
    31         print('{} finished with result: {}, took: {:.2f} seconds'.format(
    32             service.name, ip, time.time() - start))
    33 
    34 
    35 async def main():
    36     futures = [fetch_ip(service) for service in SERVICES]
    37     await asyncio.wait(futures)  # intentionally ignore results
    38 
    39 
    40 asyncio.get_event_loop().run_until_complete(main())
    41 
    42 
    43 Fetching IP from ip-api
    44 Fetching IP from ipify
    45 Fetching IP from borken
    46 borken is unresponsive
    47 Task exception was never retrieved
    48 ipify finished with result: 58.135.78.16, took: 1.29 seconds
    49 future: <Task finished coro=<fetch_ip() done, defined at /usr/local/envs/async/201812/a.py:24> exception=KeyError('this-is-not-an-attr',)>
    50 Traceback (most recent call last):
    51   File "/usr/local/envs/async/201812/a.py", line 33, in fetch_ip
    52     ip = json_response[service.ip_attr]
    53 KeyError: 'this-is-not-an-attr'

    这看起来非常像我们上一个例子的输出,减去了来自asyncio的tut-tut消息

    Timeouts

    如果我们不太关心IP?想象一下,它是一个更复杂的响应的一个很好的补充,但我们当然不希望让用户等待它。理想情况下,我们会将非阻塞调用设置为超时,之后我们只发送没有IP属性的复杂响应。

    再次等待 只有我们需要的属性:

    import time
    import random
    import asyncio
    import aiohttp
    import argparse
    from collections import namedtuple
    from concurrent.futures import FIRST_COMPLETED
    
    Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
    
    SERVICES = (
        Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
        Service('ip-api', 'http://ip-api.com/json', 'query'),
    )
    
    DEFAULT_TIMEOUT = 0.01
    
    
    async def aiohttp_get_json(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()
    
    
    async def fetch_ip(service):
        start = time.time()
        print('Fetching IP from {}'.format(service.name))
    
        await asyncio.sleep(random.randint(1, 3) * 0.1)
        try:
            json_response = await aiohttp_get_json(service.url)
        except:
            return '{} is unresponsive'.format(service.name)
    
        ip = json_response[service.ip_attr]
    
        print('{} finished with result: {}, took: {:.2f} seconds'.format(
            service.name, ip, time.time() - start))
        return ip
    
    
    async def main(timeout):
        response = {
            "message": "Result from asynchronous.",
            "ip": "not available"
        }
    
        futures = [fetch_ip(service) for service in SERVICES]
        done, pending = await asyncio.wait(
            futures, timeout=timeout, return_when=FIRST_COMPLETED)
    
        for future in pending:
            future.cancel()
    
        for future in done:
            response["ip"] = future.result()
    
        print(response)
    
    
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '-t', '--timeout',
        help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT),
        default=DEFAULT_TIMEOUT, type=float)
    args = parser.parse_args()
    
    print("Using a {} timeout".format(args.timeout))
    asyncio.get_event_loop().run_until_complete(main(args.timeout))
    
    
    Using a 0.01 timeout
    Fetching IP from ip-api
    Fetching IP from ipify
    {'message': 'Result from asynchronous.', 'ip': 'not available'}
  • 相关阅读:
    error: device not found
    xcode-select: error: tool 'xcodebuild' requires Xcode, but active developer directory '/Library/Deve
    联想X系列服务器
    华为服务器
    linux db2升级
    aix6.1升级openssh&&openssl
    upgrading mysql: error: 1102: Incorrect database name
    linux7配置yum网络源
    How to install fixpack on DB2
    mysql 表空间管理
  • 原文地址:https://www.cnblogs.com/tcppdu/p/10083991.html
Copyright © 2011-2022 走看看