zoukankan      html  css  js  c++  java
  • python-高级编程-05-异步IO

     【异步非阻塞IO】

    ------------------------------------------------------------------------------------------------------------------- 

    小明和小强是笔友,他们通过有邮件的方式联系,小明发一封,小强回一封

    邮差有点时候天气好,早上发出的信件,晚上就能收的到,然后有的时候遇到意外,

    可能好几天都不能收到,小强就在邮箱前面等啊等,一直等到天荒地老

     cont =1

    mailbox = xxxxx

    while 1:

      mail = mailbox.cleck()

      #look mail

      sleep(cont)

    但是这样有个问题

    cont的大小 如果小了就非常占用cpu 设置太大就效率低

    我们能如果节约呢?

    小强在邮箱上放了个旗子,让邮递员放好了信就把上面的旗子立起来,然后小强知道了 就去拿信

    这样小强就能在等信的时候做其他的事情了。

    这就是异步io的思想。

    【select pool   epoll 】

    --------------------------------------------------------------------------------------------------------------- 

    随时小强的人脉越来越广,他就搞了很多邮箱,一个邮箱对应一个人,然后上面都放好旗子

    select 就相当于循环的检查邮箱上面的旗子,一有旗子立起来的话就通知小强,但是他的限制只有1024个(这里的1024 在操作系统里面值得并不是同时打开文件描述符的个数,而是号,当文件描述符超过了这个号,即便是前头的都关闭了,还是不能够增加新的)

    pool 跟select的区别就是去掉了1024的限制,但是如果连接增多那么他还是会面的很慢。因为没次循环都要对所有邮箱进行检查。

    [epoll] -- epoll 是linux内核的可扩展I/O事件通知机制,特点就是让需要大量操作文件描述符的程序得以 更优异的性能。

    select和poll的时间复杂度是O(n) 而epoll 是O(1)

    这里的e 值得event 事件

    说白了就是 给每个邮箱编号然后改造成电子邮箱,一但有邮件过来了,立马会在手机app上面显示 xxx号邮箱 有邮件啦!!!

     【epoll的ET和LT】

    --------------------------------------------------------------------------------------------------------------- 

    ET:边缘触发  来了信,手机上面只响一下

    LT:水平触发 来了信,手机一直响,一直到你打开app处理

    ET在处理的时候,如果处理方式不够谨慎 ,很容易造成消息丢失

    epoll 默认是 LT

    ######################################################################

    #!/usr/bin/env python
    #coding:utf-8
    
    ###################
    #from:Rico.xia    #
    #time:2017-08-19  #
    ###################
    
    import socket
    import time
    import select
    import pdb
    
    #############################################
    class STATE:
        def __init__(self):
            self.state = 'accept'
            self.have_read = 0
            self.need_read = 5
            self.have_write = 0
            self.need_write = 5
            self.buff_write = ""
            self.buff_read = ""
            self.sock_obj = ""
    
    ##############################################
    class nbNetBase:
        def setFd(self,sock):
    
            #实例化STATE() 并初始化参数
            tmp_state = STATE()
    
            #将socket对象存到实例化的对象的字段中
            tmp_state.sock_obj = sock
    
            #获取这个socket的文件描述符,并作为key,value 为实例化的状态(STATE)对象。
            self.conn_state[sock.fileno()]=tmp_state
    
    
        def accept(self,fd):
    
            #dbpPrint("
     --accept start")
            #获取sock对象
            sock = self.conn_state[fd].sock_obj
            #获取客户端的sock对象,和ip
            conn,addr  = sock.accept()
            #设置为非阻塞状态
            conn.setblocking(0)
            #返回
            return conn
    
        def close(self,fd):
            print 'closeing'
            try:
                sock = self.conn_state[fd].sock_obj
                sock.close()
            finally:
                self.epoll_sock.unregister(fd)
                self.conn_state.pop(fd)
        
        def read(self,fd):
            print 'readng'
            try:
                #获取客户端的sock信息
                sock_state = self.conn_state[fd]
                conn  = sock_state.sock_obj
                #防止宇宙射线导致的字符反转
                if sock_state.need_read <=0:
                    raise socket.error
                #第一读取,读取之前定义好的需要读取的字节数
                one_read = conn.recv(sock_state.need_read)
                #如果读取的信息为0就抛出异常
                if len(one_read) == 0:
                    raise socket.error
                #判断开头是不是回车 在telnet里面敲击一次回车会发送数据如果没有输入就是 
    
                if one_read[0:2] == "
    ":
                    one_read = one_read[2:]
                #buff里面缓存住读取内容
                sock_state.buff_read += one_read
                #已经读了的加上去读取的字节数
                sock_state.have_read += len(one_read)
                #需要读取的剪掉读取的字节数
                sock_state.need_read -= len(one_read)
                #sock_state.printState()
    
                #这里如果我们已经读了5个字节的头部之后 我们需要去读取需要出的处理的数据
                #我们的协议是 00003abc -->00003cbc 我们通过头部知道需要读多少处理数据
                if sock_state.have_read == 5:
                    header_said_need_read = int(sock_state.buff_read)
                    if header_said_need_read <= 0:
                        raise socket.error
                    sock_state.need_read += header_said_need_read
                    sock_state.buff_read = ""
                    #如果满足条件 返回 readcontent  取阅读内容
                    return "readcontent"
                elif sock_state.need_read == 0:
                    #如果需要读取的已经读完了 那么我们处理数据
                    return "process"
    
                else:
                    return "readmore"
    
            except (socket.error,ValueError),msg:
                try:
                    if msg.error == 11:
                        return "retry"
                except:
                    pass
                return "closing"
    
        def write(self,fd):
            #跟read方法类似
            sock_state = self.conn_state[fd]
            conn = sock_state.sock_obj
            last_have_send = sock_state.have_write
            try:
                have_send  = conn.send(sock_state.buff_write[last_have_send:])
                sock_state.have_write += have_send
                sock_state.need_write -= have_send
                if sock_state.need_write == 0 and sock_state.have_write != 0:
                    return "writecomplete"
                else:
                    return "writemore"
            except socket.error,msg:
                return "closing"
    
        def run(self):
            while True:
                # poll()返回的epoll_list就是有事件发生的fd的list
                # 需要在循环中按照event的类型分别处理,一般分为以下几种类型
                #  EPOLLIN :表示对应的文件描述符可以读;
                #  EPOLLOUT:表示对应的文件描述符可以写;
                #  EPOLLPRI:表示对应的文件描述符有紧急的数据可读;一般不需要特殊处理
                #  EPOLLERR:表示对应的文件描述符发生错误;后面这两种需要关闭socket
                #  EPOLLHUP:表示对应的文件描述符被挂断
                #我们这里出现 EPOLLERR 和 EPOLLERR 情况将改变成'closing' 然后将fd扔到状态机中
                epoll_list = self.epoll_sock.poll()
                print epoll_list
                for fd,events in epoll_list:
                    #dbgPrint("
     --epoll return fd:%d.event:%s"%(fd,events))
                    sock_state = self.conn_state[fd]
                    if select.EPOLLHUP & events:
                        sock_state.state='closing'
                    elif select.EPOLLERR & events:
                        sock_state.state = 'closing'
                    self.state_machine(fd)
    
    
        def state_machine(self,fd):
            #dbgPrint('
     - state machine:fd:%d,status:%s'%(fd,self.conn_state[fd].state))
            print 'machine is use'
            ##根据连接状态做对应处理
            #获取状态对象
            sock_state =self.conn_state[fd]
            #根据字典sm里的处理方法处理其对应的状态
            self.sm[sock_state.state](fd)
    
    class nbNet(nbNetBase):
        def __init__(self,addr,port,logic):
            dbgPrint("
     __init__:start !")
            #定义一个空字典
            self.conn_state = {}
            # 初始化监听socket socket.AF_INET指的是以太网 socket.SOCK_STREAM指的是TCP
            self.listen_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            #设置socket 开启SO_REUSEADDR,这样当监听端口处于各种xxx_WAIT的状态的时候 也可以listen、bind
            self.listen_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
            #绑定端口
            self.listen_sock.bind((addr,port))
            # 指定backlog数
            self.listen_sock.listen(10)
            ##################################
            #设置文件描述符的相关信息
            self.setFd(self.listen_sock)
            #实例化epool对象
            self.epoll_sock = select.epoll()
            #将文件描述符传给epoll 并告诉它只只关注EPOLLIN,即connect过来的连接
            self.epoll_sock.register(self.listen_sock.fileno(),select.EPOLLIN)
            #初始化方法数据
            self.logic = logic
            #初始化逻辑处理数据,告诉状态机如果程序在某一状态应该用什么方法处理
            self.sm = {"accept":self.accept2read,
                       'read':self.read2process,
                       "write":self.write2read,
                       "process":self.process,
                       'closing':self.close}
    
        def accept2read(self,fd):
            #获取客户端socket对象
            conn = self.accept(fd)
            #将客户端也注册一次epoll,监听EPOLLIN状态,也就当客户端有数据的时候
            self.epoll_sock.register(conn.fileno(),select.EPOLLIN)
            #注册客户端fd
            self.setFd(conn)
            #初始化设置为状态为read
            self.conn_state[conn.fileno()].state = 'read'
    
        def read2process(self,fd):
            read_ret = ""
            try:
                #去读取
                read_ret = self.read(fd)
            except(Exception),msg:
                #dbgPrint(msg)
                read_ret = 'closing'
    
            #我们已经读取头部了根据read方法的返回做各种处理
            if read_ret == 'process':
                self.process(fd)
            elif read_ret == "readcontent":pass
            elif read_ret == 'readmore':pass
            elif read_ret == 'retry':pass
            elif read_ret == 'closing':
                self.conn_state[fd].state = 'closing'
                self.state_machine(fd)
            else:
                raise Exception('impossible state returned by self.read')
        def process(self,fd):
            #获取socket 对象
            sock_state = self.conn_state[fd]
            #通过传入的logic方法得到要返回给客户端的值
            response = self.logic(sock_state.buff_read)
            sock_state.buff_write = "%05d%s" %(len(response),response)
            sock_state.need_write = len(sock_state.buff_write)
            sock_state.state = 'write'
            self.epoll_sock.modify(fd,select.EPOLLOUT)
        def write2read(self,fd):
            try:
                write_ret = self.write(fd)
            except socket.error,msg:
                write_ret = 'closing'
            if write_ret == 'writemore':
                pass
            elif write_ret == 'writecomplete':
                sock_state = self.conn_state[fd]
                conn = sock_state.sock_obj
                self.setFd(conn)
                self.conn_state[fd].state = 'read'
                self.epoll_sock.modify(fd,select.EPOLLIN)
            elif write_ret == 'cldsing':
                self.conn_state[fd].state = 'closing'
                self.state_machine(fd)
    
    if __name__ == '__main__':
        def logic(d_in):
            return (d_in[::-1])
        #将参数传到nbNet类并且实例化,监听本地 6789 端口
        reverseD = nbNet('0.0.0.0',6789,logic)
        #执行run函数
        reverseD.run()
    

      

  • 相关阅读:
    【noi 2.6_9270】&【poj 2440】DNA(DP)
    【noi 2.6_9271】奶牛散步(DP)
    【noi 2.6_747】Divisibility(DP)
    【noi 2.6_7113】Charm Bracelet(DP)
    【noi 2.6_9268】酒鬼(DP)
    【noi 2.6_9267】核电站(DP)
    【noi 2.6_9265】取数游戏(DP)
    【noi 2.6_2000】&【poj 2127】 最长公共子上升序列 (DP+打印路径)
    【noi 2.6_8786】方格取数(DP)
    【noi 2.6_90】滑雪(DP)
  • 原文地址:https://www.cnblogs.com/nerdlerss/p/7359584.html
Copyright © 2011-2022 走看看