zoukankan      html  css  js  c++  java
  • io模型

    首先我们来定义流的概念:

        一个流可以是文件,socket,pipe等可以进行I/O操作的内核对象。不管是文件,还是套接字(socket),还是管道(pipe),我们都可以把他们看作流。

    之后我们来讨论I/O操作,通过read,我们可以从流中读入数据;通过write,我们可以往流中写入数据。现在假定1种情形,我们需要从流中读数据,但是流中还没有数据,(典型的例子为,客户端要从socket读数据,但是服务器端还没有把数据传回来),这时候该怎么办?等呗,怎么等呢?下面写着

    阻塞:阻塞是个什么概念呢?比如某个时候你在等快递,但是你还不知道快递什么时候过来,而且你也没有别的事可以干(或者说接下来的事要等快递来了才能做);那么你可以去睡觉了,因为你知道快递把货送来时一定会给你打电话(假定一定能叫醒你)。我们在没有学习线程进程和yield前基本都是这样的阻塞程序

    非阻塞忙轮询:接着上面等快递的例子,如果用忙轮询的方法,那么你需要知道快递员的手机号,然后每分钟给他打个电话:“你到了没?”

        很明显一般人不会用第二种做法,不仅显得无脑,浪费话费不说,还占用了快递员大量的时间。

        大部分程序也不会用第二种做法,因为第一种方法经济而简单,经济是指消耗很少的CPU时间,如果线程睡眠了,就掉出了系统的调度队列,暂时不会去瓜分CPU宝贵的时间片。

        为了了解阻塞是如何进行的,我们来讨论缓冲区,以及内核缓冲区,最终把I/O事件解释清楚。缓冲区的引入是为了减少频繁I/O操作而引起频繁的系统调用(你知道它很慢的),当你操作一个流时,更多的是以缓冲区为单位进行操作,这是相对于用户空间而言。对于内核来说,也需要缓冲区。

    假设有一个管道,进程A为管道的写入方,B为管道的读出方。假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”。也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。

        这四种情形涵盖了四个I/O事件,内核缓冲区满,内核缓冲区空,内核缓冲区非空,内核缓冲区非满。这四个I/O事件是进行阻塞同步的根本。(如果不能理解“同步”是什么概念,请学习操作系统的锁,信号量,条件变量等任务同步方面的相关知识)。

    然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。于是再来考虑非阻塞忙轮询的I/O方式,我们发现可以同时处理多个流(把一个流从阻塞模式切换到非阻塞模式再此不予讨论)

     我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及epoll)处理甚至直接忽略。

        为了避免CPU空转,可以引进一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把“忙”字去掉了)。

    i/o多路复用

     于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。

    异步i/o

        但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,每一次无差别轮询时间就越长。再次说了这么多,终于能好好解释epoll了。
        epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll只会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的(复杂度降低到了O(1))。
    import select 导入select模块
    
    epoll = select.epoll() 创建一个epoll对象
    
    epoll.register(文件句柄,事件类型) 注册要监控的文件句柄和事件
    
    事件类型:
    
      select.EPOLLIN    可读事件
    
      select.EPOLLOUT   可写事件
    
      select.EPOLLERR   错误事件
    
      select.EPOLLHUP   客户端断开事件
    
    epoll.unregister(文件句柄)   销毁文件句柄
    
    epoll.poll(timeout)  当文件句柄发生变化,则会以列表的形式主动报告给用户进程,timeout
    
                         为超时时间,默认为-1,即一直等待直到文件句柄发生变化,如果指定为1
    
                         那么epoll每1秒汇报一次当前文件句柄的变化情况,如果无变化则返回空
    
    epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor)
    
    epoll.modfiy(fineno,event) fineno为文件描述符 event为事件类型  作用是修改文件描述符所对应的事件
    
    epoll.fromfd(fileno) 从1个指定的文件描述符创建1个epoll对象
    
    epoll.close()   关闭epoll对象的控制文件描述符
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    import socket
    import select
    import Queue
    
    #创建socket对象
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #设置IP地址复用
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    #ip地址和端口号
    server_address = ("127.0.0.1", 8888)
    #绑定IP地址
    serversocket.bind(server_address)
    #监听,并设置最大连接数
    serversocket.listen(10)
    print  "服务器启动成功,监听IP:" , server_address
    #服务端设置非阻塞
    serversocket.setblocking(False)  
    #超时时间
    timeout = 10
    #创建epoll事件对象,后续要监控的事件添加到其中
    epoll = select.epoll()
    #注册服务器监听fd到等待读事件集合
    epoll.register(serversocket.fileno(), select.EPOLLIN)
    #保存连接客户端消息的字典,格式为{}
    message_queues = {}
    #文件句柄到所对应对象的字典,格式为{句柄:对象}
    fd_to_socket = {serversocket.fileno():serversocket,}
    
    while True:
      print "等待活动连接......"
      #轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]
      events = epoll.poll(timeout)
      if not events:
         print "epoll超时无活动连接,重新轮询......"
         continue
      print "" , len(events), "个新事件,开始处理......"
      
      for fd, event in events:
         socket = fd_to_socket[fd]
         #如果活动socket为当前服务器socket,表示有新连接
         if socket == serversocket:
                connection, address = serversocket.accept()
                print "新连接:" , address
                #新连接socket设置为非阻塞
                connection.setblocking(False)
                #注册新连接fd到待读事件集合
                epoll.register(connection.fileno(), select.EPOLLIN)
                #把新连接的文件句柄以及对象保存到字典
                fd_to_socket[connection.fileno()] = connection
                #以新连接的对象为键值,值存储在队列中,保存每个连接的信息
                message_queues[connection]  = Queue.Queue()
         #关闭事件
         elif event & select.EPOLLHUP:
            print 'client close'
            #在epoll中注销客户端的文件句柄
            epoll.unregister(fd)
            #关闭客户端的文件句柄
            fd_to_socket[fd].close()
            #在字典中删除与已关闭客户端相关的信息
            del fd_to_socket[fd]
         #可读事件
         elif event & select.EPOLLIN:
            #接收数据
            data = socket.recv(1024)
            if data:
               print "收到数据:" , data , "客户端:" , socket.getpeername()
               #将数据放入对应客户端的字典
               message_queues[socket].put(data)
               #修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
               epoll.modify(fd, select.EPOLLOUT)
         #可写事件
         elif event & select.EPOLLOUT:
            try:
               #从字典中获取对应客户端的信息
               msg = message_queues[socket].get_nowait()
            except Queue.Empty:
               print socket.getpeername() , " queue empty"
               #修改文件句柄为读事件
               epoll.modify(fd, select.EPOLLIN)
            else :
               print "发送数据:" , data , "客户端:" , socket.getpeername()
               #发送数据
               socket.send(msg)
    
    #在epoll中注销服务端文件句柄
    epoll.unregister(serversocket.fileno())
    #关闭epoll
    epoll.close()
    #关闭服务器socket
    serversocket.close()
    
    服务端代码
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    import socket
    
    #创建客户端socket对象
    clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #服务端IP地址和端口号元组
    server_address = ('127.0.0.1',8888)
    #客户端连接指定的IP地址和端口号
    clientsocket.connect(server_address)
    
    while True:
        #输入数据
        data = raw_input('please input:')
        #客户端发送数据
        clientsocket.sendall(data)
        #客户端接收数据
        server_data = clientsocket.recv(1024)
        print '客户端收到的数据:'server_data
        #关闭客户端socket
        clientsocket.close() 
    
    客户端代码

    本博客一般给自己看的

    上面参考了http://www.cnblogs.com/maociping/p/5132583.html并整理了一下

    其实还有一个更加好用的模块叫selectors

    他会在我们调用i/o多路复用的时候根据现场环境自动的选择一个更有效率的方法

    通常windows下只能使用select

    而在linux下可以根据需求使用poll和epoll

    我们编程也要考虑兼容性的,不可能到一个系统在编写一边吧,那样会哭的。

    来自官网的解释

    https://docs.python.org/3/library/selectors.html?highlight=selectors#module-selectors

    import selectors
    import socket
    
    sel=selectors.DefaultSelector()#调用别名实例化
    
    def accept(sock,mask):#定义一个socket监听方法
        conn,addr=sock.accept()#将socket实例化的socket对象给conn,addr
        print('accepted',conn,'from',addr)#打印一下看看
        conn.setblocking(False)#设置conn为非阻塞
        sel.register(conn,selectors.EVENT_READ,read)#将接收到的conn进行监听绑定一个read方法,
    def read(conn,mask):
        data = conn.recv(1000)#监听
        if data:
            print('echoing' ,repr(data),'to',conn)#显示一下一字符串的方式的data
            conn.send(data)#将send发送回去
        else:
            print('closing',conn)#打印‘关闭’,关闭conn
            sel.unregister(conn)#取消conn的监听释放资源
            conn.close()#关闭conn
    sock=socket.socket()
    sock.bind(('localhost',8088))
    sock.listen(100)
    sock.setblocking(False)
    sel.refister(sock,selectors.EVENT_READ,accept)#将sock加入监听,添加accept方法
    while True:
        events=sel.select()#这里开始进行监听
        for key,mask in events:
            callback=key.data##监听到的一个select提取当中key.data就是accept方法
            callback(key.fileobj,mask)#将key.fileobj也就是socket对象传进方法内

    这里selectors是有层级关系的,他会根据当前系统环境优选择

    类层结构优先级

    +  -  SelectSelector   低
    +  -  PollSelector 
    +  -  EpollSelector 
    +  -  DevpollSelector 
    +  -  KqueueSelector   高

    关于

    sel.refister(sock,selectors.EVENT_READ,accept)中
    不变含义
    EVENT_READ 可供阅读
    EVENT_WRITE 可供写
     
  • 相关阅读:
    SQL SERVER常用的统计用法
    SQL SERVER将多行数据合并成一行(转载)
    RabbitMQ安装与搭建
    CentOS 配置vncserver
    sql server2008禁用远程连接
    CentOS系统常用命令
    CentOS系统配置redis
    CentOS系统配置solr
    利用Warensoft Stock Service编写高频交易软件--客户端驱动接口说明
    蛙人高频交易拆单策略—蛙人高频软件结构及使用说明
  • 原文地址:https://www.cnblogs.com/935415150wang/p/7219393.html
Copyright © 2011-2022 走看看