zoukankan      html  css  js  c++  java
  • python 套接字之select poll epoll

    python下的select模块使用 以及epoll与select、poll的区别

    先说epoll与select、poll的区别(总结)


    select, poll, epoll 都是I/O多路复用的具体的实现,之所以有这三个鬼存在,其实是他们出现是有先后顺序的。
    I/O多路复用这个概念被提出来以后, select是第一个实现 (1983 左右在BSD里面实现的)。


    select

    select 被实现以后,很快就暴露出了很多问题。

    • select 会修改传入的参数数组,这个对于一个需要调用很多次的函数,是非常不友好的。
    • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
    • select 如果任何一个sock(I/O stream)出现了数据,select仅仅会返回,但是并不会告诉你是那个sock上有数据,于是你只能自己一个一个的找,)每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
    • select 只能监视1024个链接, 这个跟草榴没啥关系哦,linux 定义在头文件中的,参见FD_SETSIZE。
    • select 不是线程安全的,如果你把一个sock加入到select, 然后突然另外一个线程发现,尼玛,这个sock不用,要收回。对不起,这个select 不支持的,如果你丧心病狂的竟然关掉这个sock, select的标准行为是。。呃。。不可预测的,

    于是14年以后(1997年)一帮人又实现了poll, poll 修复了select的很多问题


    poll

    • poll 去掉了1024个链接的限制,于是要多少链接呢, 主人你开心就好。
    • poll 从设计上来说,不再修改传入数组,不过这个要看你的平台了,所以行走江湖,还是小心为妙。

    其实拖14年那么久也不是效率问题, 而是那个时代的硬件实在太弱,一台服务器处理1千多个链接简直就是神一样的存在了,select很长段时间已经满足需求。
    但是poll仍然不是线程安全的, 这就意味着,不管服务器有多强悍,你也只能在一个线程里面处理一组I/O流。你当然可以那多进程来配合了,不过然后你就有了多进程的各种问题。
    于是5年以后, 在2002, 大神 Davide Libenzi 实现了epoll.


    epoll

    epoll 可以说是I/O 多路复用最新的一个实现,epoll 修复了poll 和select绝大部分问题, 比如:

    • 对于每次需要将FD从用户态拷贝至内核态,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
    • 同样epoll也没有1024的连接数限制
    • epoll 现在是线程安全的。
    • epoll 现在不仅告诉你sock组里面数据,还会告诉你具体哪个sock有数据,你不用自己去找了。
      • epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。

    总结

    (1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。
    (2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省不少的开销。


    python下的一个select实例
    转自http://www.cnblogs.com/coser/archive/2012/01/06/2315216.html

    server

    import select
    import socket
    import Queue
    
    #create a socket
    server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    server.setblocking(False)
    #set option reused
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)
    
    server_address= ('192.168.1.102',10001)
    server.bind(server_address)
    
    server.listen(10)
    
    #sockets from which we except to read
    inputs = [server]
    
    #sockets from which we expect to write
    outputs = []
    
    #Outgoing message queues (socket:Queue)
    message_queues = {}
    
    #A optional parameter for select is TIMEOUT
    timeout = 20
    
    while inputs:
        print "waiting for next event"
        readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
    
        # When timeout reached , select return three empty lists
        if not (readable or writable or exceptional) :
            print "Time out ! "
            break;   
        for s in readable :
            if s is server:
                # A "readable" socket is ready to accept a connection
                connection, client_address = s.accept()
                print "    connection from ", client_address
                connection.setblocking(0)
                inputs.append(connection)
                message_queues[connection] = Queue.Queue()
            else:
                data = s.recv(1024)
                if data :
                    print " received " , data , "from ",s.getpeername()
                    message_queues[s].put(data)
                    # Add output channel for response   
                    if s not in outputs:
                        outputs.append(s)
                else:
                    #Interpret empty result as closed connection
                    print "  closing", client_address
                    if s in outputs :
                        outputs.remove(s)
                    inputs.remove(s)
                    s.close()
                    #remove message queue
                    del message_queues[s]
        for s in writable:
            try:
                next_msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print " " , s.getpeername() , 'queue empty'
                outputs.remove(s)
            else:
                print " sending " , next_msg , " to ", s.getpeername()
                s.send(next_msg)
    
        for s in exceptional:
            print " exception condition on ", s.getpeername()
            #stop listening for input on the connection
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
            #Remove message queue
            del message_queues[s]

    client

    import socket
    
    messages = ["This is the message" ,
                "It will be sent" ,
                "in parts "]
    
    print "Connect to the server"
    
    server_address = ("192.168.1.102",10001)
    
    #Create a TCP/IP sock
    
    socks = []
    
    for i in range(10):
        socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM))
    
    for s in socks:
        s.connect(server_address)
    
    counter = 0
    for message in messages :
        #Sending message from different sockets
        for s in socks:
            counter+=1
            print "  %s sending %s" % (s.getpeername(),message+" version "+str(counter))
            s.send(message+" version "+str(counter))
        #Read responses on both sockets
        for s in socks:
            data = s.recv(1024)
            print " %s received %s" % (s.getpeername(),data)
            if not data:
                print "closing socket ",s.getpeername()
                s.close()
    

      

    poll server

    import socket
    import select
    import Queue
    
    # Create a TCP/IP socket, and then bind and listen
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(False)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_address = ("192.168.1.102", 10001)
    
    print  "Starting up on %s port %s" % server_address
    server.bind(server_address)
    server.listen(5)
    message_queues = {}
    #The timeout value is represented in milliseconds, instead of seconds.
    timeout = 1000
    # Create a limit for the event
    READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
    READ_WRITE = (READ_ONLY|select.POLLOUT)
    # Set up the poller
    poller = select.poll()
    poller.register(server,READ_ONLY)
    #Map file descriptors to socket objects
    fd_to_socket = {server.fileno():server,}
    while True:
        print "Waiting for the next event"
        events = poller.poll(timeout)
        print "*"*20
        print len(events)
        print events
        print "*"*20
        for fd ,flag in  events:
            s = fd_to_socket[fd]
            if flag & (select.POLLIN | select.POLLPRI) :
                if s is server :
                    # A readable socket is ready to accept a connection
                    connection , client_address = s.accept()
                    print " Connection " , client_address
                    connection.setblocking(False)
    
                    fd_to_socket[connection.fileno()] = connection
                    poller.register(connection,READ_ONLY)
    
                    #Give the connection a queue to send data
                    message_queues[connection]  = Queue.Queue()
                else :
                    data = s.recv(1024)
                    if data:
                        # A readable client socket has data
                        print "  received %s from %s " % (data, s.getpeername())
                        message_queues[s].put(data)
                        poller.modify(s,READ_WRITE)
                    else :
                        # Close the connection
                        print "  closing" , s.getpeername()
                        # Stop listening for input on the connection
                        poller.unregister(s)
                        s.close()
                        del message_queues[s]
            elif flag & select.POLLHUP :
                #A client that "hang up" , to be closed.
                print " Closing ", s.getpeername() ,"(HUP)"
                poller.unregister(s)
                s.close()
            elif flag & select.POLLOUT :
                #Socket is ready to send data , if there is any to send
                try:
                    next_msg = message_queues[s].get_nowait()
                except Queue.Empty:
                    # No messages waiting so stop checking
                    print s.getpeername() , " queue empty"
                    poller.modify(s,READ_ONLY)
                else :
                    print " sending %s to %s" % (next_msg , s.getpeername())
                    s.send(next_msg)
            elif flag & select.POLLERR:
                #Any events with POLLERR cause the server to close the socket
                print "  exception on" , s.getpeername()
                poller.unregister(s)
                s.close()
                del message_queues[s]
    

      

  • 相关阅读:
    Mime Types
    对MySQL DELETE语法的详细解析
    创建MySQL存储过程示例
    Python安装
    python学习之matplotlib实战
    python学习之numpy实战
    SQLserver建表规则
    机器学习之BP神经网络
    机器学习之决策树算法
    Spring学习总结
  • 原文地址:https://www.cnblogs.com/MY0213/p/8998719.html
Copyright © 2011-2022 走看看