zoukankan      html  css  js  c++  java
  • asyncio异步IO——Streams详解

    前言

    本文翻译自python3.7官方文档——asyncio-stream,译者马鸣谦,邮箱 1612557569@qq.com。转载请注明出处。

    数据流(Streams)

    数据流(Streams)是用于处理网络连接的高阶异步/等待就绪(async/await-ready)原语,可以在不使用回调和底层传输协议的情况下发送和接收数据。

    以下是一个用asyncio实现的TCP回显客户端:

    import asyncio
    
    async def tcp_echo_client(message):
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8888)
    
        print(f'Send: {message!r}')
        writer.write(message.encode())
    
        data = await reader.read(100)
        print(f'Received: {data.decode()!r}')
    
        print('Close the connection')
        writer.close()
        await writer.wait_closed()
    
    asyncio.run(tcp_echo_client('Hello World!'))
    

    完整代码见例子一节。

    Stream方法

    以下所列的高层asyncio方法可以被用作创建和处理Stream:

    • coroutine asyncio.open_connection(host=None,*,loop=None,limit=None,ssl=None,family=0,proto=0,flags=0,sock=None,local_addr=None,server_hostname=None,ssl_handshake_timeout=None)

      • 创建一个网络连接,并返回一对(reader,writer)对象。
      • 返回的readerwriter对象是StreamReaderStreamWriter类的实例。
      • loop是可选参数,在此方法被某个协程await时能够自动确定。
      • limit限定返回的StreamReader实例使用的缓冲区大小。默认情况下,缓冲区限制为64KiB
      • 其余的参数被直接传递给loop.create_connection()
      • python3.7新增ssl_handshake_timeout参数。
    • coroutine asyncio.start_server(client_connected_cb,host=None,port=None,*,loop=None,limit=None,family=socket.AF_UNSPEC,flags=socket.AI_PASSIVE,sock=None,backlog=100,ssl=None,reuse_address=None,reuse_port=None,ssl_handshake_timeout=None,start_serving=True)

      • 启动一个socket服务端。
      • client_connected_cb指定的回调函数,在新连接建立的时候被调用。该回调函数接收StreamReaderStreamWriter类的‘实例对’(reader,writer)作为两个参数。
      • client_connected_cb可以是普通的可调用函数,也可以是协程函数。如果是协程函数,那么会被自动封装为Task对象处理。
      • loop是可选参数,在此方法被某个协程await时能够自动确定。
      • limit限定返回的StreamReader实例使用的缓冲区大小。默认情况下,缓冲区限定值为64KiB
      • 其余的参数被直接传递给loop.create_server()
      • python3.7新增ssl_handshake_timeoutstart_serving参数。

    Unix Sockets

    • coroutine asyncio.open_unix_connection(path=None,*,loop=None,limit=None,ssl=None,sock=None,server_hostname=None,ssl_handshake_timeout=None)

      • 创建一个Unix socket连接,并返回一对(reader,writer)对象。
      • open_connection类似,只是运行在Unix sockets上。
      • 另见loop.create_unix_connection()
      • 可用于:Unix
      • python3.7新增ssl_handshake_timeout参数。
      • python3.7修正path参数可以为类path(path-like)对象
    • coroutine *asyncio.start_unix_server(client_connected_cb, path=None, , loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

      • 启动一个Unix socket 服务端。
      • 类似于start_server,只是运行在Unix sockets上。
      • 另见loop.create_unix_server
      • 可用于:Unix
      • python3.7新增ssl_handshake_timeout参数。
      • python3.7修正path参数可以为类path(path-like)对象

    StreamReader

    class asyncio.StreamReader

    定义一个读取器对象,提供从IO数据流中读取数据的API。
    不建议 直接实例化StreamReader对象。建议通过open_connection()start_server()创建此类对象。

    • coroutine read(n=-1)

      • 最多读取n字节数据。如果n未设置,或被设置为-1,则读取至EOF标志,并返回读到的所有字节。
      • 如果在缓冲区仍为空时遇到EOF,则返回一个空的bytes对象。
    • coroutine readline()

      • 读取一行(以 为标志)。
      • 如果在找到 之前遇到EOF,则返回已读取到的数据段。
      • 如果遇到EOF时内部缓冲区仍为空,则返回空的bytes对象。
    • coroutine readexactly(n)

      • 精确读取n字节数据。
      • 如果在尚未读够n字节时遇到EOF,则引发IncompleteReadError异常。已经读取的部分数据可以通过IncompleteReadError.partial属性获取。
    • coroutine readuntil(separator=b' ')

      • 从数据流中读取数据直到遇到separator
      • 如果执行成功,读到的数据和分隔符将从内部缓冲区里移除。返回的数据会在末尾包含分隔符。
      • 如果读取数据的总量超过了配置的数据流缓冲区限制,则引发LimitOverrunError,数据会被留在内部缓冲区中,可以被再次读取。
      • 如果在找到separator分隔符之前遇到EOF,则引发IncompleteReadError异常,内部缓冲区会被重置。IncompleteReadError.partial属性会包含部分separator
      • python3.5.2新增。
    • at_eof()

      • 如果缓冲区为空,且feed_eof()被调用,则返回True

    StreamWriter

    class asyncio.StreamWriter

    定义一个写入器对象,提供向IO数据流中写入数据的API。
    不建议直接实例化StreamWriter对象,建议通过open_connectionstart_server实例化对象。

    • can_writer_eof()

      • 如果下层传输支持write_eof方法,则返回True,否则返回False
    • write_eof()

      • 在缓冲的写入数据被刷新后,关闭数据流的写入端。
    • transport

      • 返回下层的asyncio传输。
    • get_extra_info(name,default=None)

      • 访问可选的传输信息。
    • write(data)

      • 向数据流中写入数据。
      • 此方法不受流量控制的影响。write()应同drain()一同使用。
    • writelines()

      • 向数据流中写入bytes列表(或任何的可迭代对象)。
      • 此方法不受流量控制的影响。应与drain()一同使用。
    • coroutine drain()

      • 等待恢复数据写入的时机。例如:
      writer.write(data)
      await writer.drain()
      
      • 这是一个与底层IO输入缓冲区交互的流量控制方法。当缓冲区达到上限时,drain()阻塞,待到缓冲区回落到下限时,写操作可以被恢复。当不需要等待时,drain()会立即返回。
    • close()

      • 关闭数据流。
    • is_closing()

      • 如果数据流已经关闭或正在关闭,则返回True
    • coroutine wait_closed()

      • 保持等待,直到数据流关闭。
      • 保持等待,直到底层连接被关闭,应该在close()后调用此方法。
      • Python3.7新增。

    示例

    利用Stream实现TCP回显客户端

    import asyncio
    
    async def tcp_echo_client(message):
        reader, writer = await asyncio.open_connection(
            '127.0.0.1', 8888)
    
        print(f'Send: {message!r}')
        writer.write(message.encode())
    
        data = await reader.read(100)
        print(f'Received: {data.decode()!r}')
    
        print('Close the connection')
        writer.close()
    
    asyncio.run(tcp_echo_client('Hello World!'))
    

    利用Stream实现TCP回显服务端

    import asyncio
    
    async def handle_echo(reader, writer):
        data = await reader.read(100)
        message = data.decode()
        addr = writer.get_extra_info('peername')
    
        print(f"Received {message!r} from {addr!r}")
    
        print(f"Send: {message!r}")
        writer.write(data)
        await writer.drain()
    
        print("Close the connection")
        writer.close()
    
    async def main():
        server = await asyncio.start_server(
            handle_echo, '127.0.0.1', 8888)
    
        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')
    
        async with server:
            await server.serve_forever()
    
    asyncio.run(main())
    

    获取HTTP头

    import asyncio
    import urllib.parse
    import sys
    
    async def print_http_headers(url):
        url = urllib.parse.urlsplit(url)
        if url.scheme == 'https':
            reader, writer = await asyncio.open_connection(
                url.hostname, 443, ssl=True)
        else:
            reader, writer = await asyncio.open_connection(
                url.hostname, 80)
    
        query = (
            f"HEAD {url.path or '/'} HTTP/1.0
    "
            f"Host: {url.hostname}
    "
            f"
    "
        )
    
        writer.write(query.encode('latin-1'))
        while True:
            line = await reader.readline()
            if not line:
                break
    
            line = line.decode('latin1').rstrip()
            if line:
                print(f'HTTP header> {line}')
    
        # Ignore the body, close the socket
        writer.close()
    
    url = sys.argv[1]
    asyncio.run(print_http_headers(url))
    

    用法:

    python example.py http://example.com/path/page.html
    

    或:

    python example.py https://example.com/path/page.html
    

    利用Stream注册等待数据的开放socket

    import asyncio
    import socket
    
    async def wait_for_data():
        # Get a reference to the current event loop because
        # we want to access low-level APIs.
        loop = asyncio.get_running_loop()
    
        # Create a pair of connected sockets.
        rsock, wsock = socket.socketpair()
    
        # Register the open socket to wait for data.
        reader, writer = await asyncio.open_connection(sock=rsock)
    
        # Simulate the reception of data from the network
        loop.call_soon(wsock.send, 'abc'.encode())
    
        # Wait for data
        data = await reader.read(100)
    
        # Got data, we are done: close the socket
        print("Received:", data.decode())
        writer.close()
    
        # Close the second socket
        wsock.close()
    
    asyncio.run(wait_for_data())
    
  • 相关阅读:
    小程序 生成二维码
    uni-app调用wifi接口
    微信小程序代码上传,审核发布小程序
    uni-app开发经验分享十五: uni-app 蓝牙打印功能
    面试题 16.11. 跳水板
    LeetCode 63. 不同路径 II
    LeetCode 44. 通配符匹配
    LeetCode 108. 将有序数组转换为二叉搜索树
    LeetCode 718. 最长重复子数组
    LeetCode 814. 二叉树剪枝
  • 原文地址:https://www.cnblogs.com/mamingqian/p/10044730.html
Copyright © 2011-2022 走看看