zoukankan      html  css  js  c++  java
  • Python day39 :非阻塞IO模型 select /epoll 及使用方法

    ## 非阻塞IO及多路复用:
    
    ```python
    存在的问题:
    ​    当执行到recv时,如果对象并没有发送数据,程序阻塞了,无法执行其他任务
    解决方案:
    ​1.多线程或多进程,
    ​         当客户端并发量非常大的时候,服务器可能就无法开启新的线程或进程,如果不对数量加以限制 服务器就崩溃了 
    ​2.    线程池或进程池
    ​        首先限制了数量 保证服务器正常运行,但是问题是,如果客户端都处于阻塞状态,这些线程也阻塞了 
    ​3.    协程:
    ​        使用一个线程处理所有客户端,当一个客户端处于阻塞状态时可以切换至其他客户端任务   
    '当单个任务耗时较长时,协程效率反而不高了(cpu切换影响效率)
    ```
    
    ## 非阻塞IO模型:
    
    ```python
    阻塞IO模型在执行recv 和 accept  时 都需要经历wait_data  
    ​    非阻塞IO即 在执行recv 和accept时 不会阻塞 可以继续往下执行 
    ​    如何使用:
    ​        将server的blocking设置为False 即设置非阻塞  
    ​    存在的问题 :
    ​        这样一来 你的进程 效率 非常高 没有任何的阻塞 
    ​        很多情况下 并没有数据需要处理,但是我们的进程也需要不停的询问操作系统  会导致CPU占用过高
    ​        而且是无意义的占用  
    案例:(服务端)
    from socket import *
    server = socket()
    server.bind(("127.0.0.1", 1688))
    server.listen(5)
    server.setblocking(False)
    
    clients = []  # 用来存储客户端socket对象的列表
    msgs = []  # 用来存储要发送的数据和客户端socket对象
    
    # 连接客户端的循环
    while True:
        try:
            client, addr = server.accept()  # 接受三次握手信息
            # 不报错,说明连接成功,执行下面代码
            print("连接一个客户端:%s" % addr[1])
            clients.append(client)
            # 不在这里执行recv,是因为建立连接和接受数据不是同时进行的,
            # 在这里可能还没有接受,报错,后续也不会再接受,一直存放在操作系统
            # 内存中,没有取的操作.
        except BlockingIOError as e:
            print("无人连接")
            # 收到数据的操作,
            for c in clients[:]:  # 遍历列表做删除操作,会出错,故切片成一个新的列表
                try:
                    data = c.recv(2048)
                    if not data:  # 主动断开连接
                        c.close()
                        clients.remove(c)
                    # 这里不执行send,是因为如果这时缓存区满了,报IO错,也就是,接受完毕
                    # 再次接受将报IO错(操作系统内存被recv之后将为空,等待接受状态),但是还没有发送成功,
                    # 就被从列表中删除了
                    msgs.append((c, data))
                except BlockingIOError as e:
                    print("此客户端还没有发送数据,不需要接受")
                except ConnectionResetError:
                    print("次客户端断开链接")
                    c.close()
                    clients.remove(c)
            # 发送数据操作
            for item in msgs[:]:
                try:
                    c, msg = item
                    c.send(msg.upper())
                    msgs.remove(item)  # 如果发送成功,删除数据.失败的话,下次遍历继续发送
                except BlockingIOError:
                    print("发送IO阻塞导致失败")
        print("over一次")
    客户端:
    import os
    from socket import *
    client = socket()
    client.connect(("127.0.0.1", 1688))
    while True:
        msg = input("msg:")
        if not msg: continue
        client.send(msg.encode("utf-8"))
        print(client.recv(2048).decode("utf-8"))
    
    ```
    
    ## 多路复用IO模型:select模块的应用:
    
    ```python
    什么是多路复用:
        本质是多路共用,多个连接共用一个select阻塞,或者说多个连接共用一个线程.因此select试用多连接而不适单连接.
    在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
    select多路复用服务端:
        import socket
    import select
    server = socket.socket()
    server.bind(("127.0.0.1",1688))
    server.listen()
    server.setblocking(False)
    rlist = [server,]  # 将需要检测(是否可读==recv)的socket对象放到该列表中
                       # accept也是一个读数据操作,默认也会阻塞  也需要让select来检测
    
    # 注意 select最多能检测1024个socket 超出直接报错    这是select自身设计的问题    最终的解决方案epoll
    
    wlist = []  # 将需要检测(是否可写==send)的socket对象放到该列表中
                # 只要缓冲区不满都可以写
    msgs = [("socket","msg")]  # 存储需要发送的数据  等待select 检测后 在进行发送
    
    print("start")
    while True:
        readable_list,writeable_list,_ = select.select(rlist,wlist,[])  # 会阻塞等到 有一个或多个socket 可以被处理
        print("%s个socket可读" % len(readable_list),"%s个socket可写" % len(writeable_list))
    
        """
        readable_list 中存储的是已经可以读取数据的socket对象   可能是服务器 可能是客户端
        """
        # 处理可读列表
        for soc in readable_list:
            if soc == server:
                # 服务器的处理
                client,addr = server.accept()
                #将新连接的socket对象 加入到待检测列表中
                rlist.append(client)
            else:
                try:
                    # 客户端的处理
                    data = soc.recv(2048)
                    if not data:
                        soc.close()
                        rlist.remove(soc)   # 如果对方下线  关闭socket 并且从待检测列表中删除
                        continue
                    # 不能直接发  因为此时缓冲区可能已经满了    导致send阻塞住, 所以要发送数据前一个先这个socket交给select来检查
                    # soc.send(data.upper())
                    if soc not in wlist:
                        wlist.append(soc)
                    # 将要发送的数据先存起来
                    msgs.append((soc,data))
                except ConnectionResetError:
                    soc.close()
                    # 对方下线后 应该从待检测列表中删除 socket
                    rlist.remove(soc)
                    wlist.remove(soc)
        # 处理可写列表
        for soc in writeable_list:
            # 由于一个客户端可能有多个数据要发送 所以遍历所有客户端
            for i in msgs[:]:
                if i[0] == soc:
                    soc.send(i[1])
                    # 发送成功  将这个数据从列表中删除
                    msgs.remove(i)
            # 数据已经都发给客户端   这个socket还需不需要检测是否可写,必须要删除
            wlist.remove(soc) # 否则 只要缓冲区不满 一直处于可写 导致死循环
    print("over")
    ```
    
    
    
    ## 多路复用 select 进阶epol模型:
    
    ```python
    1.select,需要遍历socket列表,频繁的对等待队列进行添加移除操作,
    2.数据到达后还需要给遍历所有socket才能获知哪些socket有数据
    两个操作消耗的时间随着要监控的socket的数量增加而大大增加,
    处于效率考虑才规定了最大只能监视1024个socket
    ## epol l要解决的问题
    #1.避免频繁的对等待队列进行操作 
    #2.避免遍历所有socket
    #对于第一个问题epoll,采取的方案是,将对等待队列的维护和,阻塞进程这两个操作进行拆分,
    #第二个问题是select中进程无法获知哪些socket是有数据的所以需要遍历
    epol为了解决这个问题,在内核中维护了一个就绪列表
    1.创建epoll对象,epoll也会对应一个文件,由文件系统管理
    2.执行register时,将epoll对象 添加到socket的等待队列中
    3.数据到达后,CPU执行中断程序,将数据copy给socket
    4.在epoll中,中断程序接下来会执行epoll对象中的回调函数,传入就绪的socket对象 
    5.将socket,添加到就绪列表中 
    6.唤醒epoll等待队列中的进程,
    进程唤醒后,由于存在就绪列表,所以不需要再遍历socket了,直接处理就绪列表即可 
    解决了这两个问题后,并发量得到大幅度提升,最大可同时维护上万级别的socket
    
    ```
    
    ## epoll相关函数:
    
    ```python
    import select 导入select模块
    epoll = select.epoll() 创建一个epoll对象
    epoll.register(文件句柄,事件类型) 注册要监控的文件句柄和事件
    事件类型:
      select.EPOLLIN    可读事件
    
      select.EPOLLOUT   可写事件
    
      select.EPOLLERR   错误事件
    
      select.EPOLLHUP   客户端断开事件
    
    epoll.unregister(文件句柄)   销毁文件句柄
    
    epoll.poll(timeout)  当文件句柄发生变化,则会以列表的形式主动报告给用户进程,timeout
    
                         为超时时间,默认为-1,即一直等待直到文件句柄发生变化,如果指定为1
    
                         那么epoll每1秒汇报一次当前文件句柄的变化情况,如果无变化则返回空
    
    epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor)
    
    epoll.modfiy(fineno,event) fineno为文件描述符 event为事件类型  作用是修改文件描述符所对应的事件
    
    epoll.fromfd(fileno) 从1个指定的文件描述符创建1个epoll对象
    
    epoll.close()   关闭epoll对象的控制文件描述符
    
    ```
    
    ## epoll案例:服务器
    
    ```python
    # coding:utf-8
    import socket, select
    server = socket.socket()
    server.bind(("127.0.0.1", 1688))
    server.listen(5)
    msgs = []
    fd_socket = {server.fileno(): server}
    epoll = select.epoll()
    # 注册服务器的 写就绪
    epoll.register(server.fileno(), select.EPOLLIN)
    
    while True:
        for fd, event in epoll.poll():
            sock = fd_socket[fd]
            print(fd, event)
            # 返回的是文件描述符 需要获取对应socket
            if sock == server:  # 如果是服务器 就接受请求
                client, addr = server.accept()
                # 注册客户端写就绪
                epoll.register(client.fileno(), select.EPOLLIN)
                # 添加对应关系
                fd_socket[client.fileno()] = client
            # 读就绪
            elif event == select.EPOLLIN:
                data = sock.recv(2018)
                if not data:
                    # 注销事件
                    epoll.unregister(fd)
                    # 关闭socket
                    sock.close()
                    # 删除socket对应关系
                    del fd_socket[fd]
                    print(" somebody fuck out...")
                    continue
    
                print(data.decode("utf-8"))
                # 读完数据 需要把数据发回去所以接下来更改为写就绪=事件
                epoll.modify(fd, select.EPOLLOUT)
                #记录数据
                msgs.append((sock,data.upper()))
            elif event == select.EPOLLOUT:
                for item in msgs[:]:
                    if item[0] == sock:
                        sock.send(item[1])
                        msgs.remove(item)
                # 切换关注事件为写就绪
                epoll.modify(fd,select.EPOLLIN)
    ```
    
    ## 客户端
    
    ```python
    #coding:utf-8
    #客户端
    #创建客户端socket对象
    import socket
    clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #服务端IP地址和端口号元组
    server_address = ('127.0.0.1',1688)
    #客户端连接指定的IP地址和端口号
    clientsocket.connect(server_address)
    
    while True:
        #输入数据
        data = raw_input('please input:')
        if data == "q":
            break
        if not data:
          continue
        #客户端发送数据
        clientsocket.send(data.encode("utf-8"))
        #客户端接收数据
        server_data = clientsocket.recv(1024)
        print ('客户端收到的数据:',server_data)
    #关闭客户端socket
    clientsocket.close()
    
    ```
  • 相关阅读:
    HashTable源码浅析(基于jdk1.8.0_231)
    LinkedHashMap源码浅析(基于jdk1.8.0_231)
    SortedSet接口源码浅析(基于jdk1.8.0_231)
    NavigableSet接口源码浅析(基于jdk1.8.0_231)
    TreeSet源码浅析(基于jdk1.8.0_231)
    TreeMap源码浅析(基于jdk1.8.0_231)
    Map接口源码解析(基于jdk1.8.0_231)
    Arrays工具类源码详解(基于jdk1.8.0_231)
    Collections源码详解(基于jdk1.8.0_231)
    BitSet源码详解 (基于jdk1.8.0.231)
  • 原文地址:https://www.cnblogs.com/huhongpeng/p/11042288.html
Copyright © 2011-2022 走看看