zoukankan      html  css  js  c++  java
  • 剖析epool

    【01】什么是epool:

      当互联网的用户越来越多的时候,人们发现传统的网络io模型,扛不住用户的高并发请求的时候。各个操作系统给出了自己对应的答案,

    而linux给出的答案是epool。epool是系统的一个接口,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready

    队列的描述符集合就行。

    【02】epool 是如何工作的

          我们通过代码一步步去深入epool 在网络io中的工作.

    epool是能够让文件描述状态发生变化的时候产生通知,这个也被成为事件events

    events可以是以下几个宏的集合:
    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    EPOLLOUT:表示对应的文件描述符可以写;
    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;

    我们可以把链接产生的文件描述符放到epool中,然后让epool去监听事件,然后做对应的处理。

    events 其实还包含两种信号

      EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
      EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

      这是说epoll事件产生触发机制。

    ----------------------------------------------------------------------------------------------------------------------------------------------------

    [1] 注册文件描述符

      这里我们把文件描述符号分成两种:

        1 socket对象的文件描述符号

          这个文件描述符只有一个,就是socket本身,他的状态通常在 listen accept 之间转换。 当有链接过来的时候listen变成 accept ,然后又变成listen等待下一链接

        2链接文件描述符(这种说法并不完全准确):他是每当有一个新的链接过l来的时候,会新建一个文件描述符号,(一般从3号开始累加,012---标准输入输入系类,3--->socket)在代码中,它的状态

        是有accept read write process close 这五种 通常链接是从 accpet -->read -->write -->process-->close 这种变化的,但是根据实际情况会有不同,长链接从process之后会继续read,而当产生问题时就直接close

    [2] 通过状态机执行状态处理:

      我们在代码中,处理问题的方式,如过要跟据上一次状态做出决定,那么你可以写个状态机,这里我们代码中,通过状态机,就是根据程序状态字当前状态,决定该如何处理。

    严格来说我们的代码就做这两件事

    #!/usr/bin/env python
    # coding: utf-8
    
    from daemon import Daemon
    import socket
    import select
    import time
    import pdb
    
    __all__ = ["nbNet", "sendData_mh"]
    #DEBUG = True
    
    from nbNetUtils import *
    
    class nbNetBase:
        '''
            基本类,定义了单一功能的多个方法
        
        '''
        
        def setFd(self, sock):
            """sock is class object of socket"""
            #dbgPrint("
     -- setFd start!")
            tmp_state = STATE()
            tmp_state.sock_obj = sock
            self.conn_state[sock.fileno()] = tmp_state
            #self.conn_state[sock.fileno()].printState()
            #dbgPrint("
     -- setFd end!")
    
        def accept(self, fd): 
            """fd is fileno() of socket"""
            print "FD ",fd,"-------- aceept and setblocking",'state:',self.conn_state[fd].state
            #dbgPrint("
     -- accept start!")
            '''
                获取conn_state中存的socket对象,并且设置accept,等待客户端链接
            '''
            sock_state = self.conn_state[fd]
            sock = sock_state.sock_obj
            conn, addr = sock.accept()
            # set to non-blocking: 0
            #设置为非阻塞
            conn.setblocking(0)
            #返回要链接的客户端
            print conn.fileno(),"conn fileno"
            return conn
        
        def close(self, fd):
            """fd is fileno() of socket"""
            try:
                # cancel of listen to event
                #j获取链接对象
                sock = self.conn_state[fd].sock_obj
                #从epool关注中删除,防止引用计数的问题
                self.epoll_sock.unregister(fd)
                #关闭socket
                sock.close()
                #从字典中去除这个fd的记录
                self.conn_state.pop(fd)
                #tmp_pipe = self.popen_pipe
                #self.popen_pipe = 0
                #tmp_pipe.close()
            except:
                #dbgPrint("Close fd: %s abnormal" % fd)
                pass
        #@profile
        def read(self, fd):
            """fd is fileno() of socket"""
            #pdb.set_trace()
            try:
                #获取要读取的套接字字典信息
                sock_state = self.conn_state[fd]
                #获取socket对象
                conn = sock_state.sock_obj
                '''这里是对客户端数据的检测,因为协议是在数据前面加上协议头,0000000003 10个字节告诉客户端协议体有3个字节
                   而needread刚刚开始的时候是10然后读完协议头改成3,然后在读3个字节,如该客户端正常发送数据,是不可能为0的
    
                '''
                if sock_state.need_read <= 0:
                    raise socket.error
                #读取一次需要need_read的大小的数据
                one_read = conn.recv(sock_state.need_read)
                #dbgPrint("	read func fd: %d, one_read: %s, need_read: %d" % (fd, one_read, sock_state.need_read))
                #检验数据长度
                if len(one_read) == 0:
                    raise socket.error
                # process received data
                #把读取到到的数据存在buff_read中
                sock_state.buff_read += one_read
                #存好已经读取的长度
                sock_state.have_read += len(one_read)
                #需要读取的则减去这次读取的数据长度
                sock_state.need_read -= len(one_read)
                 
    
                # read protocol header
                if sock_state.have_read == 10:
                    #当havaread == 10的时候我们可以理解为头都读完了 我们把协议头解析成数字然后将needread改成这个数字,也就是要读多少字节的协议体
                    header_said_need_read = int(sock_state.buff_read)
                    if header_said_need_read <= 0:
                        raise socket.error
                    sock_state.need_read += header_said_need_read
                    #清除缓冲区
                    sock_state.buff_read = ''
                    #返回已经读取协议头,继续读取协议体
                    return "readcontent"
                elif sock_state.need_read == 0:
                    # recv complete, change state to process it
                    #当needread==0的时候意味着协议体和协议头都读取完成了,返回process
                    return "process"
                else:
                    #这里的意思是要读取协议体,我们假设我们一个正常包是这样的 0000000002AB 一共十二个字节
                    return "readmore"
            except (socket.error, ValueError), msg:
                try:
                    if msg.errno == 11:
                        #dbgPrint("11 " + msg)
                        return "retry"
                except:
                    pass
                return 'closing'
            
    
        #@profile
        def write(self, fd):
            #获取链接数据
            sock_state = self.conn_state[fd]
            #获取链接对象
            conn = sock_state.sock_obj
            #pdb.set_trace()
            print "poen_pipe",sock_state.popen_pipe,"=========" 
            if isinstance(sock_state.popen_pipe, file):
                try:
                    output = sock_state.popen_pipe.read()
                    print output
                except (IOError, ValueError), msg:
                    pass
                #have_send = conn.send("%010d%s" % (len(output), output))
                #todo
    
            else:
                #
                last_have_send = sock_state.have_write
                try:
                    print "last_have_send===>",last_have_send
                    print "nerd_write===>",sock_state.need_write
    
                    # to send some Bytes, but have_send is the return num of .send()
                    #获取内容,发送数据
                    have_send = conn.send(sock_state.buff_write[last_have_send:])
                    #hava_write 加上 已经发送的长度
                    sock_state.have_write += have_send
                    #need_write 减去已经读取的长度
                    sock_state.need_write -= have_send
                    #如歌需要写入的长度为零和已经写入的长度不为零,我们认为写入完成了
                    if sock_state.need_write == 0 and sock_state.have_write != 0:
                        #send complete, re init status, and listen re-read
                        #sock_state.printState()
                        #dbgPrint('
     write data completed!')
                        return "writecomplete"
                    else:
                        return "writemore"
                except socket.error, msg:
                    return "closing"
    
    
        def run(self):
            '''
                主流程方法
            '''
            while 1:
                #这里会阻塞,一直到监控的文件描述符有事件产生
                epoll_list = self.epoll_sock.poll()
                #如果事件产生epoll_list中就会包含产生事件的fd号
                for fd, events in epoll_list:
                    print fd,events,"#########"
                    #获取在setFd中定义好的字典中key为当前fd的值
                    sock_state = self.conn_state[fd]
                    #print 'EPOLLHUP:',select.EPOLLHUP,'EPOLLERR:',select.EPOLLERR,'events',events 
                    #如果长生EPOLLHUP或者EPOLLERR 将这个文件描述在self.conn_state的状态字段设置为close
                    if select.EPOLLHUP & events:
                        #dbgPrint("EPOLLHUP")
                       
                        sock_state.state = "closing"
                    elif select.EPOLLERR & events:
                        #dbgPrint("EPOLLERR")
                         
                        sock_state.state = "closing"
                    print fd,"to state_machine state is ",sock_state.state
                    #将fd号放到状态机中
                    self.state_machine(fd)
    
        def state_machine(self, fd):
            
            #time.sleep(0.1)
            #dbgPrint("
     - state machine: fd: %d, status: %s" % (fd, self.conn_state[fd].state))
            #获取self.conn_state字典key为fd的值根据fd的状态在sm这个字典中找到对应的方法执行
            sock_state = self.conn_state[fd]
            print sock_state.state,fd,'sm msg'
            self.sm[sock_state.state](fd)
    
    class nbNet(nbNetBase):
        def __init__(self, addr, port, logic):
            #dbgPrint('
    __init__: start!')
            #定义一个字典,以fileno为key记录链接信息
            self.conn_state = {}
            '''
                定义一个socket对象,监听端口,并且添加到epool事件监听里面,设置为ET模式的触发
            
            '''
            self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
            self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.listen_sock.bind((addr, port))
            self.listen_sock.listen(10)
            self.setFd(self.listen_sock)
            self.epoll_sock = select.epoll()
            # LT for default, ET add ' | select.EPOLLET '
            #把socket文件描述符添加到epool事件监听,监听事件为EPOLLIN,也就是表示对应的文件描述符可以读
            self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN)
            #实例化业务代码
            self.logic = logic
            #定义好字典,然后让状态机知道什么状态,调用什么代码
            self.sm = {
                "accept" : self.accept2read,
                "read"   : self.read2process,
                "write"  : self.write2read,
                "process": self.process,
                "closing": self.close,
            }
            #dbgPrint('
    __init__: end, register no: %s' % self.listen_sock.fileno() )
    
        #@profile
        def process(self, fd):
            #获取链接套接字的数据
            sock_state = self.conn_state[fd]
            #传递给我们业务方法 
            response = self.logic( sock_state.buff_read)
            #pdb.set_trace()
            #如果在执行业务代码的时候,没有得到返回值,重新恢复这个套接字在字典里面信息
            if response == None:
                conn = sock_state.sock_obj
                self.setFd(conn)
                print "==============> modify fd ",conn.fileno()
                self.conn_state[fd].state = "read"
                self.epoll_sock.modify(fd, select.EPOLLIN)
            else:  
                #获取数据并加上协议头
                sock_state.buff_write = "%010d%s" % (len(response), response)
                #获取数据长度并写到need_write
                sock_state.need_write = len(sock_state.buff_write)
                #修改状态为write
                sock_state.state = "write"
                #修改监控状态为EPOLLOUT
                self.epoll_sock.modify(fd, select.EPOLLOUT)
    
        #@profile
        def accept2read(self, fd):
            #默认socketfd刚刚开始为accept状态
            #获取链接对象
            conn = self.accept(fd)
          
            #将链接对象的描述符添加到epool关注中,并且关注的事件是EPOLLIN
            self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
            #在全局字典上面注册
            self.setFd(conn)
            #设置字典key为fd打链接描述符的状态为read 以便下次发生事件的时候,让状态机知道如何处理
            self.conn_state[conn.fileno()].state = "read"
       
            
    
        #@profilenbNet
        def read2process(self, fd):
            """fd is fileno() of socket"""
            #pdb.set_trace()
            #print fd,'go to raed'
            read_ret = ""
            try:
                #当之前状态是read的链接文件描述符监听EPOOLIN事件产生的时候,我们需要读取客户端发  
                read_ret = self.read(fd)
                print "read_ret ========>",read_ret
    
                '''
                    根据read方法返回的值决定该如何处理
                '''
            except (Exception), msg:
                #dbgPrint(msg)
                read_ret = "closing"
            if read_ret == "process":
                '''
                    当返回值为process表示一次协议包读取完成,我们可以执行业务代码
                '''
                self.process(fd)
    
            elif read_ret == "readcontent":
                pass
            elif read_ret == "readmore":
                pass
            elif read_ret == "retry":
                pass
            elif read_ret == "closing":
                self.conn_state[fd].state = 'closing'
                # closing directly when error.
                self.state_machine(fd)
            else:
                raise Exception("impossible state returned by self.read")
    
        #@profile
        def write2read(self, fd):
            try:
                #当事件发生,而且之前状态是write的时候,那么执行这个方法
                #对这个fd执行write方法
                write_ret = self.write(fd)
            except socket.error, msg:
                #产生以上 将状态设置为closing 下次关闭
                write_ret = "closing"
    
            if write_ret == "writemore":
    
                pass
            elif write_ret == "writecomplete":
                '''
                writecomplete,代表写入完毕
                重新设置下链接的状态,state = read 为了支持长链接,客户端下次再发送数据可以接着读写
                '''
                sock_state = self.conn_state[fd]
                conn = sock_state.sock_obj
                self.setFd(conn)
                self.conn_state[fd].state = "read"
                self.epoll_sock.modify(fd, select.EPOLLIN)
            elif write_ret == "closing":
                #关闭
                #dbgPrint(msg)
                self.conn_state[fd].state = 'closing'
                # closing directly when error.
                self.state_machine(fd)
        
    counter = 0
    if __name__ == '__main__':
        
        def logic(d_in):
            print d_in
            print "no return val"
            return "ok"
        reverseD = nbNet('0.0.0.0', 9099, logic)
        reverseD.run()
    
    #!/usr/bin/env python
    # coding: utf-8
    
    from daemon import Daemon
    import socket
    import select
    import time
    import pdb
    import sys, os
    import fcntl
    
    DEBUG = True
    #DEBUG = False
    
    from inspect import currentframe
    
    def get_linenumber():
        cf = currentframe()
        return str(cf.f_back.f_back.f_lineno)
    
    
    def dbgPrint(msg):
        if DEBUG:
            print get_linenumber(), msg
    
    def nonblocking(fd):
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    
    import signal,functools
    class TimeoutError(Exception): pass
    def timeout(seconds, error_message = 'Function call timed out'):
        def decorated(func):
            def _handle_timeout(signum, frame):
                raise TimeoutError(error_message)
            def wrapper(*args, **kwargs):
                signal.signal(signal.SIGALRM, _handle_timeout)
                signal.alarm(seconds)
                try:
                    result = func(*args, **kwargs)
                finally:
                    signal.alarm(0)
                return result
            return functools.wraps(func)(wrapper)
        return decorated
    
    @timeout(5)
    def connect_timeout(socket, host_port):
        return socket.connect(host_port)
    
    def sendData_mh(sock_l, host_l, data, single_host_retry = 3):
        """
        saver_l = ["localhost:50001","127.0.0.1:50001"]
        sock_l = [some_socket]
        sendData_mh(sock_l,saver_l,"this is data to send")
        """
        done = False
        for host_port in host_l:
            if done:
                break
            host,port =host_port.split(':')
            port = int(port)
            print "iter", host, port
            print "sock_l", sock_l
            retry = 0
            while retry < single_host_retry:
                try:
                    if sock_l[0] == None:
                        sock_l[0] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        print "connecting", host, port
                        #connect_timeout(sock_l[0], (host, port))
                        sock_l[0].settimeout(5)
                        sock_l[0].connect((host, port))
                    d = data
                    sock_l[0].sendall("%010d%s"%(len(d), d))
                    print "%010d%s"%(len(d), d)
                    count = sock_l[0].recv(10)
                    if not count:
                        print 'count', count
                        raise Exception("recv error", "recv error")
                    count = int(count)
                    buf = sock_l[0].recv(count)
                    print buf
                    if buf[:2] == "OK":
                        retry = 0
                        done = True
                        return True
                except (Exception), msg:
                    try:
                        print msg.errno
                    except:
                        pass
                    sock_l[0].close()
                    sock_l[0] = None
                    time.sleep(1)
                    retry += 1
    
    def sendData(sock_l, host, port, data):
        retry = 0 
        while retry < 3:
            try:
                if sock_l[0] == None:
                    sock_l[0] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock_l[0].connect((host, port))
                    print "connecting"
                d = data
                sock_l[0].sendall("%010d%s"%(len(d), d)) 
                print "%010d%s"%(len(d), d)
                count = sock_l[0].recv(10)
                if not count:
                    raise Exception("recv error", "recv error")
                count = int(count)
                buf = sock_l[0].recv(count)
                print buf 
                if buf[:2] == "OK":
                    retry = 0 
                    break
            except:
                sock_l[0].close()
                sock_l[0] = None
                retry += 1
    
    
    
    # initial status for state machine
    class STATE:
        def __init__(self):
            self.state = "accept"
            self.have_read = 0
            self.need_read = 10
            self.have_write = 0
            self.need_write = 0
            self.buff_write = ""
            self.buff_read = ""
            # sock_obj is a object
            self.sock_obj = ""
            self.popen_pipe = 0
        
        def printState(self):
            if DEBUG:
                dbgPrint('
     - current state of fd: %d' % self.sock_obj.fileno())
                dbgPrint(" - - state: %s" % self.state)
                dbgPrint(" - - have_read: %s" % self.have_read)
                dbgPrint(" - - need_read: %s" % self.need_read)
                dbgPrint(" - - have_write: %s" % self.have_write)
                dbgPrint(" - - need_write: %s" % self.need_write)
                dbgPrint(" - - buff_write: %s" % self.buff_write)
                dbgPrint(" - - buff_read:  %s" % self.buff_read)
                dbgPrint(" - - sock_obj:   %s" % self.sock_obj)
    

      

  • 相关阅读:
    84最佳买卖股票时机含冷冻期(309)
    83 不同路径 II(63)
    82同路径(62)
    模块与包
    名称空间与作用域
    函数的参数
    函数对象
    函数继续学习中
    python学习day3-上午
    第一个完成程序:通过文件实现注册登录
  • 原文地址:https://www.cnblogs.com/nerdlerss/p/9035928.html
Copyright © 2011-2022 走看看