zoukankan      html  css  js  c++  java
  • asyncio

    一、简介

    asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。

    asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO

    此模块为编写单线程并发代码提高基础架构,通过使用协程、套接字和其他资源的 I/O 多路复用,运行网络客户端和服务器,以及其他相关的基元。

    包内容的详细的列表如下:

    • 各种系统具体实现的可插拔 event loop
    • transport 和 protocol 抽象(类似于 Twisted 里的那些)
    • 具体支持 TCP、 UDP、 SSL、 子进程管道,延迟调用,和其他 (有些可能依赖于系统)
    • 一个模仿 concurrent.futures 模块但适合在事件循环中使用的 Future 类
    • 基于 yield from ( PEP 380)的协程和任务帮助以顺序的方式编写并发代码
    • 取消 Future 和协程的操作支持
    • 单线程的协程间使用的 synchronization primitives 类似于 threading 模块里那些
    • 一个接口,用于将工作传递到线程池,在绝对的时候,积极地使用一个阻塞I / O调用的库

    二、参考文档

    中文文档:http://python.usyiyi.cn/translate/python_352/library/asyncio.html

    官方文档:https://docs.python.org/3/library/asyncio.html

    三、关键示例

    关于asyncio模块详细使用说明这里不再赘述,下面将为大家展示一些例子,作为快速学习之用:

    1、使用 AbstractEventLoop.call_soon() 方法来安排回调的示例。回调显示 "Hello World",然后停止事件循环:

    import asyncio
    
    def hello_world(loop):
        print('Hello World')
        loop.stop()
    
    loop = asyncio.get_event_loop()
    
    # Schedule a call to hello_world()
    loop.call_soon(hello_world, loop)
    
    # Blocking call interrupted by loop.stop()
    loop.run_forever()
    loop.close()
    

    2、回调示例每​​秒显示当前日期。回调使用AbstractEventLoop.call_later()方法在5秒内重新计划自身,然后停止事件循环:

    import asyncio
    import datetime
    
    def display_date(end_time, loop):
        print(datetime.datetime.now())
        if (loop.time() + 1.0) < end_time:
            loop.call_later(1, display_date, end_time, loop)
        else:
            loop.stop()
    
    loop = asyncio.get_event_loop()
    
    # Schedule the first call to display_date()
    end_time = loop.time() + 5.0
    loop.call_soon(display_date, end_time, loop)
    
    # Blocking call interrupted by loop.stop()
    loop.run_forever()
    loop.close()
    

    3、等待文件描述器使用AbstractEventLoop.add_reader()方法接收到一些数据,然后关闭事件循环:

    import asyncio
    try:
        from socket import socketpair
    except ImportError:
        from asyncio.windows_utils import socketpair
    
    # Create a pair of connected file descriptors
    rsock, wsock = socketpair()
    loop = asyncio.get_event_loop()
    
    def reader():
        data = rsock.recv(100)
        print("Received:", data.decode())
        # We are done: unregister the file descriptor
        loop.remove_reader(rsock)
        # Stop the event loop
        loop.stop()
    
    # Register the file descriptor for read event
    loop.add_reader(rsock, reader)
    
    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())
    
    # Run the event loop
    loop.run_forever()
    
    # We are done, close sockets and the event loop
    rsock.close()
    wsock.close()
    loop.close()
    

    4、使用AbstractEventLoop.add_signal_handler()方法的信号SIGINTSIGTERM的寄存器处理程序:

    import asyncio
    import functools
    import os
    import signal
    
    def ask_exit(signame):
        print("got signal %s: exit" % signame)
        loop.stop()
    
    loop = asyncio.get_event_loop()
    for signame in ('SIGINT', 'SIGTERM'):
        loop.add_signal_handler(getattr(signal, signame),
                                functools.partial(ask_exit, signame))
    
    print("Event loop running forever, press Ctrl+C to interrupt.")
    print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
    try:
        loop.run_forever()
    finally:
        loop.close()
    
    #此示例仅适用于UNIX
    

    5、组合Futurecoroutine function的示例:

    协程函数负责计算(需要1秒),并将结果存储到Futurerun_until_complete()方法等待Future的完成。

    import asyncio
    
    @asyncio.coroutine
    def slow_operation(future):
        yield from asyncio.sleep(1)
        future.set_result('Future is done!')
    
    loop = asyncio.get_event_loop()
    future = asyncio.Future()
    asyncio.ensure_future(slow_operation(future))
    loop.run_until_complete(future)
    print(future.result())
    loop.close()
    

    6、使用Future.add_done_callback()方法来不同地编写前面的示例来明确描述控制流:

    在此示例中,Future用于将slow_operation()链接到got_result():当slow_operation()完成时,got_result()与结果一起调用。

    import asyncio
    
    @asyncio.coroutine
    def slow_operation(future):
        yield from asyncio.sleep(1)
        future.set_result('Future is done!')
    
    def got_result(future):
        print(future.result())
        loop.stop()
    
    loop = asyncio.get_event_loop()
    future = asyncio.Future()
    asyncio.ensure_future(slow_operation(future))
    future.add_done_callback(got_result)
    try:
        loop.run_forever()
    finally:
        loop.close()
    

    7、并行执行3个任务(A,B,C)的示例:

    任务在创建时自动计划执行。所有任务完成后,事件循环停止。

    import asyncio
    
    @asyncio.coroutine
    def factorial(name, number):
        f = 1
        for i in range(2, number+1):
            print("Task %s: Compute factorial(%s)..." % (name, i))
            yield from asyncio.sleep(1)
            f *= i
        print("Task %s: factorial(%s) = %s" % (name, number, f))
    
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.ensure_future(factorial("A", 2)),
        asyncio.ensure_future(factorial("B", 3)),
        asyncio.ensure_future(factorial("C", 4))]
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    

    output:

    Task A: Compute factorial(2)...
    Task B: Compute factorial(2)...
    Task C: Compute factorial(2)...
    Task A: factorial(2) = 2
    Task B: Compute factorial(3)...
    Task C: Compute factorial(3)...
    Task B: factorial(3) = 6
    Task C: Compute factorial(4)...
    Task C: factorial(4) = 24
    

    8、TCP echo客户端使用AbstractEventLoop.create_connection()方法,TCP回显服务器使用AbstractEventLoop.create_server()方法

    客户端:

    事件循环运行两次。在这个简短的例子中,优先使用run_until_complete()方法来引发异常,如果服务器没有监听,而不必写一个短的协程来处理异常并停止运行循环。run_until_complete()退出时,循环不再运行,因此在发生错误时不需要停止循环。

    import asyncio
    
    class EchoClientProtocol(asyncio.Protocol):
        def __init__(self, message, loop):
            self.message = message
            self.loop = loop
    
        def connection_made(self, transport):
            transport.write(self.message.encode())
            print('Data sent: {!r}'.format(self.message))
    
        def data_received(self, data):
            print('Data received: {!r}'.format(data.decode()))
    
        def connection_lost(self, exc):
            print('The server closed the connection')
            print('Stop the event loop')
            self.loop.stop()
    
    loop = asyncio.get_event_loop()
    message = 'Hello World!'
    coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                                  '127.0.0.1', 8888)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()
    

    服务器:

    Transport.close()可以在WriteTransport.write()之后立即调用,即使数据尚未在套接字上发送:两种方法都是异步的。不需要yield from,因为这些传输方法不是协程。

    import asyncio
    
    class EchoServerClientProtocol(asyncio.Protocol):
        def connection_made(self, transport):
            peername = transport.get_extra_info('peername')
            print('Connection from {}'.format(peername))
            self.transport = transport
    
        def data_received(self, data):
            message = data.decode()
            print('Data received: {!r}'.format(message))
    
            print('Send: {!r}'.format(message))
            self.transport.write(data)
    
            print('Close the client socket')
            self.transport.close()
    
    loop = asyncio.get_event_loop()
    # Each client connection will create a new protocol instance
    coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
    server = loop.run_until_complete(coro)
    
    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    
    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    

    9、UDP echo客户端使用AbstractEventLoop.create_datagram_endpoint()方法,UDP echo服务器使用AbstractEventLoop.create_datagram_endpoint()方法

    客户端:

    import asyncio
    
    class EchoClientProtocol:
        def __init__(self, message, loop):
            self.message = message
            self.loop = loop
            self.transport = None
    
        def connection_made(self, transport):
            self.transport = transport
            print('Send:', self.message)
            self.transport.sendto(self.message.encode())
    
        def datagram_received(self, data, addr):
            print("Received:", data.decode())
    
            print("Close the socket")
            self.transport.close()
    
        def error_received(self, exc):
            print('Error received:', exc)
    
        def connection_lost(self, exc):
            print("Socket closed, stop the event loop")
            loop = asyncio.get_event_loop()
            loop.stop()
    
    loop = asyncio.get_event_loop()
    message = "Hello World!"
    connect = loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, loop),
        remote_addr=('127.0.0.1', 9999))
    transport, protocol = loop.run_until_complete(connect)
    loop.run_forever()
    transport.close()
    loop.close()
    

    服务器:

    import asyncio
    
    class EchoServerProtocol:
        def connection_made(self, transport):
            self.transport = transport
    
        def datagram_received(self, data, addr):
            message = data.decode()
            print('Received %r from %s' % (message, addr))
            print('Send %r to %s' % (message, addr))
            self.transport.sendto(data, addr)
    
    loop = asyncio.get_event_loop()
    print("Starting UDP server")
    # One protocol instance will be created to serve all client requests
    listen = loop.create_datagram_endpoint(
        EchoServerProtocol, local_addr=('127.0.0.1', 9999))
    transport, protocol = loop.run_until_complete(listen)
    
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    
    transport.close()
    loop.close()
    

    10、等待套接字使用协议使用AbstractEventLoop.create_connection()方法接收数据,然后关闭事件循环

    import asyncio
    try:
        from socket import socketpair
    except ImportError:
        from asyncio.windows_utils import socketpair
    
    # Create a pair of connected sockets
    rsock, wsock = socketpair()
    loop = asyncio.get_event_loop()
    
    class MyProtocol(asyncio.Protocol):
        transport = None
    
        def connection_made(self, transport):
            self.transport = transport
    
        def data_received(self, data):
            print("Received:", data.decode())
    
            # We are done: close the transport (it will call connection_lost())
            self.transport.close()
    
        def connection_lost(self, exc):
            # The socket has been closed, stop the event loop
            loop.stop()
    
    # Register the socket to wait for data
    connect_coro = loop.create_connection(MyProtocol, sock=rsock)
    transport, protocol = loop.run_until_complete(connect_coro)
    
    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())
    
    # Run the event loop
    loop.run_forever()
    
    # We are done, close sockets and the event loop
    rsock.close()
    wsock.close()
    loop.close()
    

    11、TCP回显客户端使用asyncio.open_connection()函数,TCP回显服务器使用asyncio.start_server()函数

    客户端:

    import asyncio
    
    @asyncio.coroutine
    def tcp_echo_client(message, loop):
        reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
                                                            loop=loop)
    
        print('Send: %r' % message)
        writer.write(message.encode())
    
        data = yield from reader.read(100)
        print('Received: %r' % data.decode())
    
        print('Close the socket')
        writer.close()
    
    message = 'Hello World!'
    loop = asyncio.get_event_loop()
    loop.run_until_complete(tcp_echo_client(message, loop))
    loop.close()
    

    服务器:

    import asyncio
    
    @asyncio.coroutine
    def handle_echo(reader, writer):
        data = yield from reader.read(100)
        message = data.decode()
        addr = writer.get_extra_info('peername')
        print("Received %r from %r" % (message, addr))
    
        print("Send: %r" % message)
        writer.write(data)
        yield from writer.drain()
    
        print("Close the client socket")
        writer.close()
    
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
    server = loop.run_until_complete(coro)
    
    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    
    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    

    12、在命令行中获取URL的HTTP头的简单示例:

    import asyncio
    import urllib.parse
    import sys
    
    @asyncio.coroutine
    def print_http_headers(url):
        url = urllib.parse.urlsplit(url)
        if url.scheme == 'https':
            connect = asyncio.open_connection(url.hostname, 443, ssl=True)
        else:
            connect = asyncio.open_connection(url.hostname, 80)
        reader, writer = yield from connect
        query = ('HEAD {path} HTTP/1.0
    '
                 'Host: {hostname}
    '
                 '
    ').format(path=url.path or '/', hostname=url.hostname)
        writer.write(query.encode('latin-1'))
        while True:
            line = yield from reader.readline()
            if not line:
                break
            line = line.decode('latin1').rstrip()
            if line:
                print('HTTP header> %s' % line)
    
        # Ignore the body, close the socket
        writer.close()
    
    url = sys.argv[1]
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(print_http_headers(url))
    loop.run_until_complete(task)
    loop.close()
    
    #用法:
    
    python example.py http://example.com/path/page.html
    
    #使用HTTPS:
    
    python example.py https://example.com/path/page.html
    

    13、协程等待,直到套接字使用open_connection()函数接收数据:

    import asyncio
    try:
        from socket import socketpair
    except ImportError:
        from asyncio.windows_utils import socketpair
    
    @asyncio.coroutine
    def wait_for_data(loop):
        # Create a pair of connected sockets
        rsock, wsock = socketpair()
    
        # Register the open socket to wait for data
        reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
    
        # Simulate the reception of data from the network
        loop.call_soon(wsock.send, 'abc'.encode())
    
        # Wait for data
        data = yield from reader.read(100)
    
        # Got data, we are done: close the socket
        print("Received:", data.decode())
        writer.close()
    
        # Close the second socket
        wsock.close()
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(wait_for_data(loop))
    loop.close()
    

    14、子进程协议的示例,用于获取子进程的输出并等待子进程退出。子过程由AbstractEventLoop.subprocess_exec()方法创建:

    import asyncio
    import sys
    
    class DateProtocol(asyncio.SubprocessProtocol):
        def __init__(self, exit_future):
            self.exit_future = exit_future
            self.output = bytearray()
    
        def pipe_data_received(self, fd, data):
            self.output.extend(data)
    
        def process_exited(self):
            self.exit_future.set_result(True)
    
    @asyncio.coroutine
    def get_date(loop):
        code = 'import datetime; print(datetime.datetime.now())'
        exit_future = asyncio.Future(loop=loop)
    
        # Create the subprocess controlled by the protocol DateProtocol,
        # redirect the standard output into a pipe
        create = loop.subprocess_exec(lambda: DateProtocol(exit_future),
                                      sys.executable, '-c', code,
                                      stdin=None, stderr=None)
        transport, protocol = yield from create
    
        # Wait for the subprocess exit using the process_exited() method
        # of the protocol
        yield from exit_future
    
        # Close the stdout pipe
        transport.close()
    
        # Read the output which was collected by the pipe_data_received()
        # method of the protocol
        data = bytes(protocol.output)
        return data.decode('ascii').rstrip()
    
    if sys.platform == "win32":
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)
    else:
        loop = asyncio.get_event_loop()
    
    date = loop.run_until_complete(get_date(loop))
    print("Current date: %s" % date)
    loop.close()
    

    15、使用Process类控制子进程和StreamReader类从标准输出读取的示例。子过程由create_subprocess_exec()函数创建:

    import asyncio.subprocess
    import sys
    
    @asyncio.coroutine
    def get_date():
        code = 'import datetime; print(datetime.datetime.now())'
    
        # Create the subprocess, redirect the standard output into a pipe
        create = asyncio.create_subprocess_exec(sys.executable, '-c', code,
                                                stdout=asyncio.subprocess.PIPE)
        proc = yield from create
    
        # Read one line of output
        data = yield from proc.stdout.readline()
        line = data.decode('ascii').rstrip()
    
        # Wait for the subprocess exit
        yield from proc.wait()
        return line
    
    if sys.platform == "win32":
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)
    else:
        loop = asyncio.get_event_loop()
    
    date = loop.run_until_complete(get_date())
    print("Current date: %s" % date)
    loop.close()
    

     

      

    有志者,事竟成!
  • 相关阅读:
    CC初试啼声-----演讲与我
    static关键字修饰类
    maven可选依赖(Optional Dependencies)和依赖排除(Dependency Exclusions)
    Installation Directory must be on a local hard drive解决办法
    回顾JDBC
    java中的定时器
    怎么删除windows中无用的服务
    java实现简单的素数判断
    SQL注入
    @Override must override a superclass method 问题解决
  • 原文地址:https://www.cnblogs.com/styier/p/6415850.html
Copyright © 2011-2022 走看看