asyncio
https://docs.python.org/3.7/library/asyncio.html
异步编程库。
支持并发运行。
提供一些高层API,
asyncio is a library to write concurrent code using the async/await syntax.
asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
asyncio is often a perfect fit for IO-bound and high-level structured network code.
asyncio provides a set of high-level APIs to:
run Python coroutines concurrently and have full control over their execution;
perform network IO and IPC;
control subprocesses;
distribute tasks via queues;
synchronize concurrent code;
https://pymotw.com/3/asyncio/index.html
相对 线程 和 进程, 协程是单进程和单线程,具有更少的系统开销。
The
asyncio
module provides tools for building concurrent applications using coroutines. While thethreading
module implements concurrency through application threads andmultiprocessing
implements concurrency using system processes,asyncio
uses a single-threaded, single-process approach in which parts of an application cooperate to switch tasks explicitly at optimal times. Most often this context switching occurs when the program would otherwise block waiting to read or write data, butasyncio
also includes support for scheduling code to run at a specific future time, to enable one coroutine to wait for another to complete, for handling system signals, and for recognizing other events that may be reasons for an application to change what it is working on.
协程 -- 顺序执行
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
任务 -- 并发执行
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main())
任务并发管理API - gather
创建的并发的任务很多时候, 仅仅使用await去管理,很繁琐了。
这时候可以使用gather接口。
- awaitable
asyncio.
gather
(*aws, loop=None, return_exceptions=False)Run awaitable objects in the aws sequence concurrently.
If any awaitable in aws is a coroutine, it is automatically scheduled as a Task.
If all awaitables are completed successfully, the result is an aggregate list of returned values. The order of result values corresponds to the order of awaitables in aws.
此接口不仅仅支持 task, 还支持协程。
Awaitables
We say that an object is an awaitable object if it can be used in an
await
expression. Many asyncio APIs are designed to accept awaitables.There are three main types of awaitable objects: coroutines, Tasks, and Futures.
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main())
流处理
Streams are high-level async/await-ready primitives to work with network connections. Streams allow sending and receiving data without using callbacks or low-level protocols and transports.
https://docs.python.org/3.7/library/asyncio-stream.html
client
import asyncio async def tcp_echo_client(message): reader, writer = await asyncio.open_connection( '127.0.0.1', 8888) print(f'Send: {message!r}') writer.write(message.encode()) data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') writer.close() asyncio.run(tcp_echo_client('Hello World!'))
server
import asyncio async def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") writer.write(data) await writer.drain() print("Close the connection") writer.close() async def main(): server = await asyncio.start_server( handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') async with server: await server.serve_forever() asyncio.run(main())
协程同步
https://docs.python.org/3.7/library/asyncio-sync.html
协程同步类似线程同步, 但其本身不是线程安全的。只能用于协程模式, 即单进程和单线程。
asyncio synchronization primitives are designed to be similar to those of the
threading
module with two important caveats:
asyncio primitives are not thread-safe, therefore they should not be used for OS thread synchronization (use
threading
for that);methods of these synchronization primitives do not accept the timeout argument; use the
asyncio.wait_for()
function to perform operations with timeouts.asyncio has the following basic synchronization primitives:
子进程管理
https://docs.python.org/3.7/library/asyncio-subprocess.html
相比subprocess模块,具有更好的封装性。
This section describes high-level async/await asyncio APIs to create and manage subprocesses.
asyncio.subprocess
import asyncio async def run(cmd): proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() print(f'[{cmd!r} exited with {proc.returncode}]') if stdout: print(f'[stdout] {stdout.decode()}') if stderr: print(f'[stderr] {stderr.decode()}') asyncio.run(run('ls /zzz'))
subprocess
import subprocess try: completed = subprocess.run( 'echo to stdout; echo to stderr 1>&2; exit 1', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError as err: print('ERROR:', err) else: print('returncode:', completed.returncode) print('stdout is {!r}'.format(completed.stdout)) print('stderr is {!r}'.format(completed.stderr))
import asyncio import sys async def get_date(): code = 'import datetime; print(datetime.datetime.now())' # Create the subprocess; redirect the standard output # into a pipe. proc = await asyncio.create_subprocess_exec( sys.executable, '-c', code, stdout=asyncio.subprocess.PIPE) # Read one line of output. data = await proc.stdout.readline() line = data.decode('ascii').rstrip() # Wait for the subprocess exit. await proc.wait() return line if sys.platform == "win32": asyncio.set_event_loop_policy( asyncio.WindowsProactorEventLoopPolicy()) date = asyncio.run(get_date()) print(f"Current date: {date}")
协程通信 -- 队列
https://docs.python.org/3.7/library/asyncio-queue.html
asyncio queues are designed to be similar to classes of the
queue
module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.
Queues can be used to distribute workload between several concurrent tasks:
import asyncio import random import time async def worker(name, queue): while True: # Get a "work item" out of the queue. sleep_for = await queue.get() # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) # Notify the queue that the "work item" has been processed. queue.task_done() print(f'{name} has slept for {sleep_for:.2f} seconds') async def main(): # Create a queue that we will use to store our "workload". queue = asyncio.Queue() # Generate random timings and put them into the queue. total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) # Create three worker tasks to process the queue concurrently. tasks = [] for i in range(3): task = asyncio.create_task(worker(f'worker-{i}', queue)) tasks.append(task) # Wait until the queue is fully processed. started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') print(f'total expected sleep time: {total_sleep_time:.2f} seconds') asyncio.run(main())