zoukankan      html  css  js  c++  java
  • I/O 多路复用之select、poll、epoll详解

      select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。


    1.select

      select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

      select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

      select有3个缺点:

        每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。

        每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。

        fd数量有限,默认1024。

     python select格式:
    rList,wList,eList = select.select(argv1,argv2,argv3,timeout)
    
    参数:
        argv1 标准输入
        argv2 如果监听序列中句柄发生变化 则将变化句柄返回至wList
        argv3 如果监听序列中句柄有错误时 则将错误句柄返回到eList
        timeout 设置阻塞时间,如果为2那么将阻塞2s,如果不设置则默认一直阻塞,直到监听的描述符发生变化

     测试代码:客户端发送任何内容,服务端会原模原样返回

    import socket
    import select
    import queue
    
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    server_address = ('localhost', 1234)
    server.bind(server_address)
    server.listen(5)
    
    
    inputs = [server] # 读事件
    outputs = []  # 写事件
    exceptions = []  # 异常事件
    msg_queues = {} #每个socket有一个发送消息的队列
    
    
    print("server is listening on %s:%s." % server_address)
    while inputs:
        #  第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码
        readable, writable, exceptional = select.select(inputs, outputs, exceptions)
        for sock in readable:
            #  client向server发起connect也是读事件,server accept后产生socket加入读队列中
            if sock is server:
                conn, addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
                msg_queues[conn] = queue.Queue()
                print("server accepts a conn.")
            else:
                #  读取client发过来的数据,最多读取1k byte。
                data = sock.recv(1024)
                #  将收到的数据返回给client
                if data:
                    msg_queues[sock].put(data)
                    if sock not in outputs:
                        #  下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写
                        outputs.append(sock)
                else:
                    #  client传过来的消息为空,说明已断开连接
                    print("server closes a conn.")
                    if sock in outputs:
                        outputs.remove(sock)
                    inputs.remove(sock)
                    sock.close()
                    del msg_queues[sock]
        for sock in writable:
            if not msg_queues[sock].empty():
                sock.send(msg_queues[sock].get_nowait())
            if msg_queues[sock].empty():
                outputs.remove(sock)
        for sock in exceptional:
            inputs.remove(sock)
            if sock in outputs:
                outputs.remove(sock)
            sock.close()
            del msg_queues[sock]
    服务端
    import socket
    ip_port = ('localhost', 1234)
    
    c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    c.connect(ip_port)
    while True:
        inp = input(">>>>:").strip()
        if not inp:
            continue
        c.send(inp.encode('utf-8'))  # 数据发给服务端,先进行编码
        data = c.recv(1024)
        print(data.decode('utf-8'))  # 接收到服务端返回的数据,进行解码
    
    c.close()
    客户端

      
    2.poll

     poll本质上和select没有区别,只是没有了最大连接数(linux上默认1024个)的限制,原因是它基于链表存储的。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

     从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

     在python中调用poll

    3.epoll

      epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

      

    在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。)

    epoll的优点主要是一下几个方面:
      1. 监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左 右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。select的最大缺点就是进程打开的fd是有数量限制的。这对 于连接数量比较大的服务器来说根本不能满足。虽然也可以选择多进程的解决方案( Apache就是这样实现的),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。

      2.IO的效率不会随着监视fd的数量的增长而下降。epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的。只有就绪的fd才会执行回调函数。

      3.如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

    import socket
    
    EOL1 = b'
    
    '
    EOL2 = b'
    
    '
    response  = b'HTTP/1.0 200 OK
    Date: Mon, 1 Jan 1996 01:01:01 GMT
    '
    response += b'Content-Type: text/plain
    Content-Length: 13
    
    '
    response += b'Hello, world!'
    
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serversocket.bind(('0.0.0.0', 8080))
    serversocket.listen(1)
    
    try:
       while True:
          connectiontoclient, address = serversocket.accept()
          request = b''
          while EOL1 not in request and EOL2 not in request:
              request += connectiontoclient.recv(1024)
          print('-'*40 + '
    ' + request.decode()[:-2])
          connectiontoclient.send(response)
          connectiontoclient.close()
    finally:
       serversocket.close()
    阻塞式socket通讯

      上述代码的socket采用的是阻塞方式, 因为python解释器在出现事件之前都处在停止状态。

      当一个程序采用阻塞socket的时候, 它经常采用一个线程(甚至一个进程)一个socket通讯的模式. 主线程保留服务器监听socket, 接受进来的连接, 一次接受一个连接, 然后把生成的socket交给一个分离的线程去做交互. 因为一个线程只和一个客户端通讯, 在任何位置的阻塞都不会造成问题. 阻塞本身不会影响其他线程的工作.

      Linux 2.6有一些方式来管理异步socket, python API能够用的有3种: select, poll和epoll. epoll和poll比select性能更好, 因为python程序不需要为了特定的事件去查询单独的socket, 而是依赖操作系统来告诉你什么socket产生了什么事件. epoll比poll性能更好, 因为它不需要每次python程序查询的时候, 操作系统都去检查所有的socket, 在事件产生的时候, linux跟踪他们, 然后在python程序调用的时候, 返回具体的列表. 所以epoll在大量(上千)并行连接下, 是一种更有效率, 伸缩性更强的机制.

      采用epoll的异步socket编程示例:

    采用epoll的程序一般这样操作:
    
    建立一个epoll对象 
    告诉epoll对象, 对于一些socket监控一些事件. 
    问epoll, 从上次查询以来什么socket产生了什么事件. 
    针对这些socket做特定操作. 
    告诉epoll, 修改监控socket和/或监控事件. 
    重复第3步到第5步, 直到结束. 
    销毁epoll对象. 
    采用异步socket的时候第3步重复了第2步的事情. 这里的程序更复杂, 因为一个线程需要和多个客户端交互.
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    import socket
    
    #创建客户端socket对象
    clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #服务端IP地址和端口号元组
    server_address = ('127.0.0.1',8888)
    #客户端连接指定的IP地址和端口号
    clientsocket.connect(server_address)
    
    while True:
        #输入数据
        data = raw_input('please input:')
        #客户端发送数据
        clientsocket.sendall(data)
        #客户端接收数据
        server_data = clientsocket.recv(1024)
        print '客户端收到的数据:'server_data
        #关闭客户端socket
        clientsocket.close() 
    客户端
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    import socket
    import select
    import Queue
    
    #创建socket对象
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #设置IP地址复用
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    #ip地址和端口号
    server_address = ("127.0.0.1", 8888)
    #绑定IP地址
    serversocket.bind(server_address)
    #监听,并设置最大连接数
    serversocket.listen(10)
    print  "服务器启动成功,监听IP:" , server_address
    #服务端设置非阻塞
    serversocket.setblocking(False)  
    #超时时间
    timeout = 10
    #创建epoll事件对象,后续要监控的事件添加到其中
    epoll = select.epoll()
    #注册服务器监听fd到等待读事件集合
    epoll.register(serversocket.fileno(), select.EPOLLIN)
    #保存连接客户端消息的字典,格式为{}
    message_queues = {}
    #文件句柄到所对应对象的字典,格式为{句柄:对象}
    fd_to_socket = {serversocket.fileno():serversocket,}
    
    while True:
      print "等待活动连接......"
      #轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]
      events = epoll.poll(timeout)
      if not events:
         print "epoll超时无活动连接,重新轮询......"
         continue
      print "" , len(events), "个新事件,开始处理......"
      
      for fd, event in events:
         socket = fd_to_socket[fd]
         #如果活动socket为当前服务器socket,表示有新连接
         if socket == serversocket:
                connection, address = serversocket.accept()
                print "新连接:" , address
                #新连接socket设置为非阻塞
                connection.setblocking(False)
                #注册新连接fd到待读事件集合
                epoll.register(connection.fileno(), select.EPOLLIN)
                #把新连接的文件句柄以及对象保存到字典
                fd_to_socket[connection.fileno()] = connection
                #以新连接的对象为键值,值存储在队列中,保存每个连接的信息
                message_queues[connection]  = Queue.Queue()
         #关闭事件
         elif event & select.EPOLLHUP:
            print 'client close'
            #在epoll中注销客户端的文件句柄
            epoll.unregister(fd)
            #关闭客户端的文件句柄
            fd_to_socket[fd].close()
            #在字典中删除与已关闭客户端相关的信息
            del fd_to_socket[fd]
         #可读事件
         elif event & select.EPOLLIN:
            #接收数据
            data = socket.recv(1024)
            if data:
               print "收到数据:" , data , "客户端:" , socket.getpeername()
               #将数据放入对应客户端的字典
               message_queues[socket].put(data)
               #修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
               epoll.modify(fd, select.EPOLLOUT)
         #可写事件
         elif event & select.EPOLLOUT:
            try:
               #从字典中获取对应客户端的信息
               msg = message_queues[socket].get_nowait()
            except Queue.Empty:
               print socket.getpeername() , " queue empty"
               #修改文件句柄为读事件
               epoll.modify(fd, select.EPOLLIN)
            else :
               print "发送数据:" , data , "客户端:" , socket.getpeername()
               #发送数据
               socket.send(msg)
    
    #在epoll中注销服务端文件句柄
    epoll.unregister(serversocket.fileno())
    #关闭epoll
    epoll.close()
    #关闭服务器socket
    serversocket.close()
    服务端
  • 相关阅读:
    ubuntu 用shell脚本实现将当前文件夹下全部文件夹中的某一类文件复制到同一文件夹下
    读书笔记-2java虚拟机的可达性算法与finalize方法
    find the longest of the shortest (hdu 1595 SPFA+枚举)
    杭电 2176 取(m堆)石子游戏(博弈)
    MVC框架的优缺点
    Wireshark-TCP协议分析(包结构以及连接的建立和释放)
    Ubuntu安装教程--Win7系统中含100M保留分区
    eclipse新建android项目出现非常多错误
    关于简单的加密和解密算法
    在一台server上部署多个Tomcat
  • 原文地址:https://www.cnblogs.com/vipchenwei/p/7202452.html
Copyright © 2011-2022 走看看