zoukankan      html  css  js  c++  java
  • 事件驱动、五种I/O操作、I/O多路复用select和epoll

    1.1 事件驱动

      1、什么是事件驱动

          定义:就是根据不同事件触发处理不同的事情(根据事件做反应),将要处理的事件加入队列中而不是立刻处理

      2、使用UI编程的点击事件理解事件驱动

          1) 目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件(点击事件)

          2) 这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:

          第一步:首先得有一个消息队列,来存放要触发的事件

          第二步:鼠标按下时,往这个队列中增加一个点击事件(消息),而不是立刻执行;

          第三步:有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等

          第四步:事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;

      3、创建一个线程监控点击事件的缺点

          1) CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;

          2) 如果扫描鼠标点击是堵塞的,又会出现下面这样的问题,如果点击后要执行一件事,

                这件事如果是阻塞的需要花5秒,那么在这五秒里鼠标就无法点击第二下

          3  如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;

    1.2 五种I/O操作

      1、I/O的实质是什么?

          1. I/O的实质是将硬盘中的数据,或收到的数据实现从内核态 copy到 用户态的过程

          2. 本文讨论的背景是Linux环境下的network IO。

      2、5中I/O操作的本质

        1. I/O操作举例说明:

            1)刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中

            2)然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间

        2. 当一个read操作发生时,它会经历两个阶段:

            1)等待数据准备 (Waiting for the data to be ready),等待将数据(比如文件)读到内核的内存中

            2)将数据从内核拷贝到用户的进程里进程中 (Copying the data from the kernel to the process)

        3. 因为这两个阶段,linux系统产生了下面五种网络模式的方案

            1)- 阻塞 I/O(blocking IO)

            2)- 非阻塞 I/O(nonblocking IO)

            3)- I/O 多路复用( IO multiplexing)

            4)- 异步 I/O(asynchronous IO)

            5)- 信号驱动 I/O( signal driven IO)

            注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

      3、与I/O相关的五个重要概念

                               1. 用户空间和内核空间

                               2. 进程切换

                               3. 进程的阻塞

                               4. 文件描述符

                               5. 缓存 I/O

        1. 第一个概念:用户空间与内核空间

            1. 现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)

            2. 操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。

            3. 为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间

                划分为两部分:一部分为内核空间,一部分为用户空间

            4. 针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,

                而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

        2. 第二个概念:进程切换

            1. 为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换

            2. 从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:

              1)保存处理机上下文,包括程序计数器和其他寄存器。

              2)更新PCB信息。

              3)把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。

              4)选择另一个进程执行,并更新其PCB。

              5)更新内存管理的数据结构。

              6)恢复处理机上下文。

              注:总而言之就是很耗资源

        3. 第三个概念:进程的阻塞

              1. 正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚

                  未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。

              2. 可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),

                  才可能将其转为阻塞状态

              3. 当进程进入阻塞状态,是不占用CPU资源的。

        4. 第四个概念:文件描述符fd

            1. 文件描述符在形式上是一个非负整数,实际上,它是一个索引值,

                指向内核为每一个进程所维护的该进程打开文件的记录表

            2. 当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符

            3. 在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开

            4. 但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统

        5. 第五个概念:缓存 I/O

            1. 缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O

            2. 在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中。

            3. 数据会先拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核缓冲区拷贝到应用程序的地址空间

            4. 缓存 I/O 的缺点:

                数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所

                带来的 CPU 以及内存开销是非常大的。

       4.1 阻塞 I/O(blocking IO)

          在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

          1) 进程首先处于阻塞状态调用recv方法,想要去接收数据

          2) 然后就会到内核中去读取数据,如果开始内核中没有data

          3) 进程就会阻塞,过一会数据来了,然后进程就将数据从内核copy到用户那里

          4) Copy结束后就会return OK,用户态就收到数据了

      4.2 非阻塞 I/O(nonblocking IO)

          1. linux下,可以通过设置socket使其变为non-blocking

          2. 当对一个non-blocking socket执行读操作时,流程是这个样子:

              1)当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block

                   用户进程,而是立刻返回一个error。

              2)从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。

              3)用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作

              4)一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝

                  到了用户内存,然后返回。

          注:所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

      4.3  I/O 多路复用( IO multiplexing)

          特点: 用户还是要等待数据从kernel拷贝到用户进程

          1. IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO

          2. select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。

          3. 它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某

              个socket有数据到达了,就通知用户进程

              1)当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket

              2)当任何一个socket中的数据准备好了,select就会返回

              3)这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

              4)虽然是多并发但是还有一些卡,因为都要等待数据从kernel拷贝到用户进程

              5)其实select,poll,epoll实质就是循环着接收数据

              6)select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

      4.4 异步 I/O(asynchronous IO)(用户完全不用等)

          Linux下的asynchronous IO其实用得很少。先看一下它的流程:

          1) 用户进程发起read操作之后,立刻就可以开始去做其它的事

          2) 而另一方面,从kernel的角度,当它收到一个asynchronous read之后,首先它会立刻返回,

              所以不会对用户进程产生任何block(首先告诉用户可以去做其他事情了)

          3) 然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送

              一个signal,告诉它read操作完成了。(然后告诉用户所有东西都弄好了)

      5、同步I/O和异步I/O区别

          1)同步I/O,异步I/O,多路复用I/O都是同步I/O

          2)两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞

          3)而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一

               个信号,告诉进程说IO完成,在这整个过程中,进程完全没有被block

    1.3 I/O 多路复用之select、poll、epoll详解

      注:无论是sellect、poll、epoll他们三个都是在I/O多路复用中检测多个socket链接,与数据从内核态到数据态没有什么关系

      1、sellect、poll、epoll三者概念区别 (多路io就绪通知方法)

        1. select 能监控数量有限,不能告诉用户程序具体哪个连接有数据

            1. select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是

                它所剩不多的优点之一

            2. select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不

                过可以通过修改宏定义甚至重新编译内核的方式提升这一限制

            3. select监控socket连接时不能准确告诉用户是哪个,比如:现在用socket监控10000链接,如果其中有一个

                链接有数据了,select就会告诉用户程序,你有socket来数据了,那样就只能自己循环10000次判断哪个活跃

        2. poll和select一样,仅仅去除了最大监控数量

            1. poll和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制

            2. 可以理解为poll是一个过渡阶段,大家也都不用他

        3. epoll (不仅没有最大监控数量限制,还能告诉用户程序哪个连接有活跃)

            注:epoll被认为是linux下性能最好的多路io就绪通知方法

            1. epoll直到Linux2.6(centos6以后)才出现了由内核直接支持

            2.它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法

            3. epoll最重要的优点是他可以直接告诉用户程序哪一个,比如现在用epoll去监控10000个socket链接,交

                给内核去监测,现在有一个连接有数据了,在有有一个连接有数据了,epoll会直接高数用户程序哪个连接有数据了

      2、epoll能实现高并发原理

          1. epoll() 中内核则维护一个链表,epoll_wait 直接检查链表是不是空就知道是否有文件描述符准备好了。

          2. 在内核实现中 epoll 是根据每个 sockfd 上面的与设备驱动程序建立起来的回调函数实现的。

          3. 某个 sockfd 上的事件发生时,与它对应的回调函数就会被调用,来把这个 sockfd 加入链表,其他处于“空闲的”状态的则不会。

          4. epoll上面链表中获取文件描述,这里使用内存映射(mmap)技术, 避免了复制大量文件描述符带来的开销

          内存映射(mmap):内存映射文件,是由一个文件到一块内存的映射,将不必再对文件执行I/O操作

      3、select实现单线程下的多并发(必须是非阻塞模式)

    import select
    import socket
    import queue
    
    server = socket.socket()
    server.bind(("localhost",9999))
    server.listen(1000)
    
    server.setblocking(False)   #设置非阻塞模式,recv没数据不阻塞,server.accept不阻塞但报错
    msg_dic = {}
    
    #因为刚开没有连接可以监控,所以将server自己交给内核监测,只要server自己活动了就代表有人连我了
    inputs = [server,]     #有多少连接需要监测就必须放到inputs列表中,将列表交给select相当于交给内核
    outputs = []
    #第一个inputs是指定要内核监控那些链接,链接中只要有一个有数据就返回所有连接
    #第二个outputs是存放还未发送的数据,下次就会发送
    #第三个inputs也是监控所有连接,但是只有连接出问题是才返回所有连接
    
    while True:
        readable,writeable,exceptional = select.select(inputs, outputs, inputs)
        print(readable,writeable,exceptional)
        for r in readable:
            if r is server:     #代表来了一个新链接
                conn,addr = server.accept()
                print("来了一个新链接:",addr)
                inputs.append(conn)     #因为这个新建立的链接还没发数据过来,现在收就报错
                #所以要想实现这个客户端发数据server端知道,就需要让这个select再监测这个
                msg_dic[conn] = queue.Queue()    #为每个链接都建立一个队列,里面存返回给客户端数据
            else:       #代表有客户端发数据过来
                data = r.recv(1024)
                print("收到数据",data)
                msg_dic[r].put(data)
                outputs.append(r)   #放入返回的连接队列里
                # r.send(data)
                # print('send done')
        for w in writeable:
            data_to_client = msg_dic[w].get()
            w.send(data_to_client)      #返回给客户端原数据
            outputs.remove(w)           #确保下次循环的时候writeable,不返回这个已经处理完的链接啦
        for e in exceptional:          #有断开的连接就从各个列表中删除
            if e in outputs:
                outputs.remove(e)
            inputs.remove()
            del msg_dic[e]
    select实现单线程下的多并发: server端
    import socket
    HOST = 'localhost'      # The remote host
    PORT = 9999              # The same port as used by the server
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("s",s)
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"),encoding="utf8").strip()
        if len(msg) == 0:continue
        s.sendall(msg)
        data = s.recv(1024)
        print('Received', repr(data))
    s.close()
    select实现单线程下的多并发: client

        说明server.setblocking(False)  设置成非阻塞模式作用

            1)python默认情况下所有的socket都是blocking,即阻塞然后等待I/O操作完成,接收数据
            2)当使用协程实现单线程并发效果时需要设置成非租塞模式,不等待I/O操作

      4、使用selector模块实现单线程下的多并发效果

          作用:selector的实质是对select,poll,epoll的封装,他默认使用epoll,但是如果系统不支持就用select

    import selectors,socket
    
    sel = selectors.DefaultSelector()    #1 生成一个selector对象
    
    def accept(sock, mask):             #只要来一个新链接就调用accept
        conn, addr = sock.accept()      # 创建这个链接
        conn.setblocking(False)         #6 把这个链接设置为非阻塞模式
        sel.register(conn, selectors.EVENT_READ, read)    #只有把活动连接注册到sel中sel才会去检测它
        #7 把新建立的链接conn又放到selector注册对象sel里了,这时的回调函数变成read了
        #这时如果再活动就会调用read了,执行完accept后就会返回到events = sel.select()继续监测
    
    def read(conn, mask):               #2第二次卡住: 客户端连接成功就卡在这里,等待客户端发送数据
        try:                            #如果客户端断开后,收数据就会引发ConnectionResetError异常
            data = conn.recv(1024)      # Should be ready
            if data:
                conn.send(data)         # Hope it won't block
        except ConnectionResetError as e:
            print('closing', conn)
            sel.unregister(conn)        #取消注册,关闭链接
            conn.close()
    
    sock = socket.socket()
    sock.bind(('localhost', 9999))
    sock.listen(100)
    sock.setblocking(False)           #2 设置为非阻塞模式
    
    #3 将前面写的sock(server实例)注册到selselector对象中,让selector对象sel监测自己,自己活跃说明有链接或者发送数据
    sel.register(sock, selectors.EVENT_READ, accept)
    while True:                     #第一次有活动可定有新链接了,只要有新链接就会调用accept方法建立链接
        print("监测活跃:新链接或者已连接发送数据")
        events = sel.select()       #4 第一次卡住: 运行服务端就会卡在这里等待客户端连接
                                    # 这里虽然写的select但是可能是epoll看系统支持什么
                                    # 当有连接过来时,就会将连接实例赋值给events
        for key, mask in events:   # for循环这个events,默认是阻塞的,只要不阻塞肯定有新的连接
            callback = key.data     # 这里的key.data就是回调函数内存地址(accept或者read)
            callback(key.fileobj, mask) #5 key.fileobj是连接的socket实例conn和addr, mask=1 不知道什么
                                        #callback(key.fileobj, mask)是执行实例的回调函数
                                        #如果是新连接回调函数是accept函数,如果已连接发数据回调函数是read
    # 注:for循环中key包含以下内容
    # SelectorKey(                                            #fileobj是连接实例:conn,addr
    # fileobj = <socket.socket fd=320, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9998)>,
    # fd = 320,                                                #文件描述符
    # events = 1,
    # data = <function accept at 0x004D64B0>))                #回调函数内存地址
    selector实现单线程下的多并发: server端
    import socket
    HOST = 'localhost'      # The remote host
    PORT = 9999              # The same port as used by the server
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("s",s)
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"),encoding="utf8").strip()
        if len(msg) == 0:continue
        s.sendall(msg)
        data = s.recv(1024)
        print('Received', repr(data))
    s.close()
    selector实现单线程下的多并发: client端
    # 使用selector的几个关键步骤
    #     1)sel = selectors.DefaultSelector()  # 生成一个selector对象
    #     2)sock.setblocking(False)  # 设置为非阻塞模式
    #     3)sel.register(sock, selectors.EVENT_READ, accept)
    #     # 将前面写的sock(server实例)注册到selselector对象中,让selector对象sel监测,accept是回调函数
    #     4)events = sel.select()  # 1第一次卡住: 运行服务端就会卡在这里等待客户端连接
    #     5)callback(key.fileobj, mask)  # key.fileobj是连接的socket实例conn, mask=1 不知道什么东东
    #     6)sel.register(conn, selectors.EVENT_READ, read)  # 将回调函数从accept变成read
    #     当客户端第一次连接时会使用accept作为回调函数,连接成功后就使用read变成回调函数
    #     7)当执行完accept函数后就再次回到events = sel.select()
    #     第一次卡住的地方等待活跃数据
    #     8)如果此时活跃的是已经连接的客户端,会调用callback(key.fileobj, mask),因为客户端第一次连接的时候调用
    #     的是accept方法,执行了sel.register(conn, selectors.EVENT_READ, read),所以回调函数已经变成了read,所
    #     以如果是客户端发送数据过来调用的是read方法,而不是accept方法
    #     9)如果在第七步中活跃的是一个新连接,那么回调函数依然是accept,就会重复上面步骤建立一个新连接
    
    
    # 注:无论是新连接还旧链接发送数据过来,实质上没有太多区别,都是使用callback(key.fileobj, mask)调用回调函数
    #     1.但是如果是第一次连接回调函数是accept,在调用完accept后就将回调函数变成了read,执行完accept函
    #       后会回到第一次卡住的地方events = sel.select(),监测活跃的连接
    #     2.如果活跃的连接是已经连接的客户端发送数据,就会调用read函数去接收数据,运行完read后也会回到
    #       events = sel.select(),监测活跃的连接
    使用selector的几个关键步骤
  • 相关阅读:
    Spark Sort-Based Shuffle具体实现内幕和源码详解
    Spark-2.0原理分析-shuffle过程
    Spark Shuffle 中 JVM 内存使用及配置内幕详情
    Spark中的Spark Shuffle详解
    Spark Shuffle Write阶段磁盘文件分析
    Spark Sort Based Shuffle内存分析
    Spark Storage(二) 集群下的broadcast
    Spark SQL metaData配置到Mysql
    TweenJS----前端常用的js动画库,缓动画和复制动画的制作,效果非常好。
    Storm入门
  • 原文地址:https://www.cnblogs.com/jiaxinzhu/p/12483582.html
Copyright © 2011-2022 走看看