zoukankan      html  css  js  c++  java
  • chapter12.4、IO概念及 多路复用

    IO概念及 多路复用

    异步编程

    同步异步

    函数或方法被调用的时候,调用者是否得到最终结果的

    直接得到最终结果结果的,就是同步调用;

    不直接得到最终结果的,就是异步调用。

    阻塞,非阻塞

    函数或方法调用的时候,是否立刻返回

    立即返回就是非阻塞调用;

    不立即返回就是阻塞调用。

    区别

    同步、异步,与阻塞、非阻塞不相关。

    同步、异步强调的是,是否得到(最终的)结果;

    阻塞、非阻塞强调是时间,是否等待。

    同步IO、异步IO、IO多路复用

    IO的两个阶段

    IO过程分两阶段:

    1.数据准备阶段

    2.内核空间复制回用户进程缓冲区阶段

    发生IO的时候:

    1、内核从输入设备读、写数据(淘米,把米放饭锅里煮饭)

    2、进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来)

    系统调用——read函数

    同步IO

    包括 阻塞IO,非阻塞IO,IO多路复用

    阻塞IO

    进程等待(阻塞),直到读写完成。(全程等待)

    非阻塞IO

    进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。

    第一阶段数据没有准备好,就先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。

    第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。

    IO多路复用

    所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。

    select几乎所有操作系统平台都支持,poll是对的select的升级。

    epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有kqueue,Windows有iocp。

    以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核“监视”select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。

    一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。

    epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。

    异步IO

    进程发起异步IO请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。

    Linux的aio的系统调用,内核从版本2.6开始支持

    Python 中 IO多路复用

    • IO多路复用

      • 大多数操作系统都支持select和poll

      • Linux 2.5+ 支持epoll

      • BSD、Mac支持kqueue

      • Windows的IOCP

    Python的select库

    实现了select、poll系统调用,这个基本上操作系统都支持。部分实现了epoll。

    底层的IO多路复用模块。

    开发中的选择

    1、完全跨平台,使用select、poll。但是性能较差

    2、针对不同操作系统自行选择支持的技术,这样做会提高IO处理的性能

    select维护一个文件描述符数据结构,单个进程使用有上限,通常是1024,线性扫描这个数据结构。效率低。

    pool和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才知道哪个设备就绪了。

    epool使用事件通知机制,使用回调机制提高效率。

    select/pool还要从内核空间复制消息到用户空间,而epoll通过内核空间和用户空间共享一块内存来减少复制。

    selectors库

    3.4版本提供selectors库,高级IO复用库

    类层次结构︰

    BaseSelector

    +-- SelectSelector 实现select

    +-- PollSelector 实现poll

    +-- EpollSelector 实现epoll

    +-- DevpollSelector 实现devpoll

    +-- KqueueSelector 实现kqueue

    selectors.DefaultSelector返回当前平台最有效、性能最高的实现。代码如下

    但是,由于没有实现Windows下的IOCP,所以,只能退化为select。

     
    # Choose the best implementation, roughly:
    #      epoll|kqueue|devpoll > poll > select.
    # select() also can't accept a FD > FD_SETSIZE (usually around 1024)
    if 'KqueueSelector' in globals():
        DefaultSelector = KqueueSelector
    elif 'EpollSelector' in globals():
        DefaultSelector = EpollSelector
    elif 'DevpollSelector' in globals():
        DefaultSelector = DevpollSelector
    elif 'PollSelector' in globals():
        DefaultSelector = PollSelector
    else:
        DefaultSelector = SelectSelector

    abstractmethod register(fileobj, events, data=None)

    为selector注册一个文件对象,监视它的IO事件。返回SelectKey对象。

    fileobj 被监视文件对象,例如socket对象

    events 事件,该文件对象必须等待的事件,读或者写,或者都读写

    data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个参数在关注的事件产生后让selector干什么事。

    event常量

    EVENT_READ           可读 0b01,内核已经准备好输入输出设备,可以开始读了

    EVENT_WRITE         可写 0b10,内核准备好了,可以往里写了

    selectors.SelectorKey 有4个属性

    fileobj 注册的文件对象

    fd 文件描述符

    events 等待上面的文件描述符的文件对象的事件

    data 注册时关联的数据

    可以使用多线程配合select监视来实现之前的chat服务器。

    也可以使用IO多路复用实现,代码如下

    from queue import Queue
    import socket
    import threading
    import logging
    import selectors
    
    FORMAT = "%(asctime)s %(thread)s %(message)s"
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    class Chatserver():
        def __init__(self, ip="127.0.0.1",port=9999):
            self.sock = socket.socket()
            self.selector = selectors.DefaultSelector() # 当前系统最优selector的实现
            self.event = threading.Event()
            self.addr = ip,port
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen()
            self.sock.setblocking(True) # 非阻塞模式
            key = self.selector.register(self.sock,selectors.EVENT_READ,data=self.accept) # 注册监控对象
            threading.Thread(target=self.select,name="select",daemon=True).start() # select 监控会阻塞程序,另起线程
    
        def select(self):
            while  not self.event.is_set():
                event = self.selector.select() # 开始监控
                for key, mask in event:
                    # logging.info(key)
                    # logging.info(mask)
                    callback = key.data # 监控到事件event就交给相应的data对象
                    callback(key)
    
        def accept(self,key:selectors.SelectorKey):
            conn, raddr = key.fileobj.accept()
            conn.setblocking(True)
            #logging.info(conn)
            key = self.selector.register(conn,selectors.EVENT_READ,data=(self.handle)) # 监控connect的socket对象的read
    
        def handle(self,key:selectors.SelectorKey):
            data = key.fileobj.recv(1024)
            if data.strip() == b"quit" or data == b"": # 客户端退出方式
                self.selector.unregister(key.fileobj) # 取消注册
                key.fileobj.close() # 关闭socket对象
                return
            msg = "{}:{}
    {}
    ".format(*key.fileobj.getpeername(),data.decode())
            logging.info(msg)
            msg = msg.encode()
            for key in self.selector.get_map().values(): # 群发
                if key.data == self.handle: # 过滤类的accept的socket对象
                    key.fileobj.send(msg)
    
        def stop(self): # 关闭方法
            self.event.set()
            fileobjs = []
            for fd,key in self.selector.get_map().items():
                fileobjs.append(key.fileobj)
            for fobj in fileobjs:
                self.selector.unregister(fobj)
                fobj.close()
            self.selector.close()
    
    
    def main():
        server = Chatserver()
        server.start()
        while True:
            cmd =input(">>>")
            if cmd.strip() == "quit":
                server.stop()
                break
            print(threading.enumerate())
    
    if __name__ == '__main__':
        main()

    send是写操作,也可以让selector监听,
    self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)
    注册语句,要监听selectors.EVENT_READ | selectors.EVENT_WRITE 读与写事件。
    回调的时候,需要mask来判断究竟是读触发还是写触发了。所以,需要修改方法声明,增加mask。
    def recv(self, sock, mask) 但是由于recv 方法处理读和写事件,所以叫recv不太合适,改名为
    def handle(self, sock, mask)

    handle方法里面处理读、写,mask有可能是0b01、0b10、0b11

    为每一个与客户端连接的socket对象增加对应的队列。
    与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到信息后,会遍历发给所有客户端的队列。这里完成一对多,即一份数据放到了所有队列中。
    与每一个客户端连接的socket对象,发现自己队列有数据,就发送给客户端。

    注意读和写是分离的,那么代码改为如下

    from queue import Queue
    import socket
    import threading
    import logging
    import selectors
    
    FORMAT = "%(asctime)s %(thread)s %(message)s"
    logging.basicConfig(format=FORMAT,level=logging.INFO)
    
    class Chatserver():
        def __init__(self, ip="127.0.0.1",port=9999):
            self.sock = socket.socket()
            self.selector = selectors.DefaultSelector()
            self.event = threading.Event()
            self.addr = ip,port
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen()
            self.sock.setblocking(True)
            key = self.selector.register(self.sock,selectors.EVENT_READ,data=self.accept)
            threading.Thread(target=self.select,name="select",daemon=True).start()
    
        def select(self):
            while  not self.event.is_set():
                event = self.selector.select()
                for key, mask in event:
                    # logging.info(key)
                    # logging.info(mask)
                    if callable(key.data):
                        callback = key.data
                        callback(key)
                    else:
                        callback = key.data[0]
                        callback(key,mask)
    
        def accept(self,key:selectors.SelectorKey):
            conn, raddr = key.fileobj.accept()
            conn.setblocking(True)
            #logging.info(conn)
            key = self.selector.register(conn,selectors.EVENT_READ | selectors.EVENT_WRITE,data=(self.handle,Queue()))
            #self.recv(conn)
    
        def handle(self,key:selectors.SelectorKey,mask):
            if mask & selectors.EVENT_READ:
                data = key.fileobj.recv(1024)
                if data.strip() == b"quit" or data == b"":
                    self.selector.unregister(key.fileobj)
                    key.fileobj.close()
                    return
                msg = "{}:{}
    {}
    ".format(*key.fileobj.getpeername(),data.decode())
                logging.info(msg)
                msg = msg.encode()
                for key in self.selector.get_map().values():
                    if isinstance(key.data,tuple):
                        key.data[1].put(msg)
            if mask & selectors.EVENT_WRITE:
                if not key.data[1].empty():
                    key.fileobj.send(key.data[1].get())
    
        def stop(self):
            self.event.set()
            fileobjs = []
            for fd,key in self.selector.get_map().items():
                fileobjs.append(key.fileobj)
            for fobj in fileobjs:
                self.selector.unregister(fobj)
                fobj.close()
            self.selector.close()
    
    
    def main():
        server = Chatserver()
        server.start()
        while True:
            cmd =input(">>>")
            if cmd.strip() == "quit":
                server.stop()
                break
            print(threading.enumerate())
    
    if __name__ == '__main__':
        main()

    这个程序最大的问题,在select()一直判断可写,几乎一直循环不停。所以对于写不频繁的情况下,就不要监听EVENT_WRITE。

    对于Server来说,一般来说,更多的是等待对方发来数据后响应时才发出数据,而不是积极的等着发送数据。所以监听EVENT_READ,收到数据之后再发送就可以了

    还可以适当增加功能

  • 相关阅读:
    2.4学习
    2.3学习
    2.2学习
    2.1学习
    公文流转系统 模拟
    《GCC编译器的使用以及静态库和动态库的制作与使用》
    《驱动调试
    《海思3521D
    《驱动调试
    《驱动调试
  • 原文地址:https://www.cnblogs.com/rprp789/p/9917558.html
Copyright © 2011-2022 走看看