zoukankan      html  css  js  c++  java
  • 并发 ---- 6. IO 多路复用

    一.阻塞IO

      1.代码示例

       2.图形示例:

    二.非阻塞IO

      设置不阻塞(server.setblocking(False)),利用 try...except. 当被阻塞时, 执行except 事件, 

      1.代码示例:

    # 服务端
    import socket
    import time
    
    server=socket.socket()
    server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8083))
    server.listen(5)
    
    server.setblocking(False) #设置不阻塞
    r_list=[]  #用来存储所有来请求server端的conn连接
    w_list={}  #用来存储所有已经有了请求数据的conn的请求数据
    
    while 1:
        try:
            conn,addr=server.accept() #不阻塞,会报错
            r_list.append(conn)  #为了将连接保存起来,不然下次循环的时候,上一次的连接就没有了
        except BlockingIOError:
            # 强调强调强调:!!!非阻塞IO的精髓在于完全没有阻塞!!!
            # time.sleep(0.5) # 打开该行注释纯属为了方便查看效果
            print('在做其他的事情')
            print('rlist: ',len(r_list))
            print('wlist: ',len(w_list))
    
    
            # 遍历读列表,依次取出套接字读取内容
            del_rlist=[] #用来存储删除的conn连接
            for conn in r_list:
                try:
                    data=conn.recv(1024) #不阻塞,会报错
                    if not data: #当一个客户端暴力关闭的时候,会一直接收b'',别忘了判断一下数据
                        conn.close()
                        del_rlist.append(conn)
                        continue
                    w_list[conn]=data.upper()
                except BlockingIOError: # 没有收成功,则继续检索下一个套接字的接收
                    continue
                except ConnectionResetError: # 当前套接字出异常,则关闭,然后加入删除列表,等待被清除
                    conn.close()
                    del_rlist.append(conn)
    
    
            # 遍历写列表,依次取出套接字发送内容
            del_wlist=[]
            for conn,data in w_list.items():
                try:
                    conn.send(data)
                    del_wlist.append(conn)
                except BlockingIOError:
                    continue
    
    
            # 清理无用的套接字,无需再监听它们的IO操作
            for conn in del_rlist:
                r_list.remove(conn)
            #del_rlist.clear() #清空列表中保存的已经删除的内容
            for conn in del_wlist:
                w_list.pop(conn)
            #del_wlist.clear()
    
    #客户端
    import socket
    import os
    import time
    import threading
    client=socket.socket()
    client.connect(('127.0.0.1',8083))
    
    while 1:
        res=('%s hello' %os.getpid()).encode('utf-8')
        client.send(res)
        data=client.recv(1024)
    
        print(data.decode('utf-8'))
    
    
    ##多线程的客户端请求版本
    # def func():
    #     sk = socket.socket()
    #     sk.connect(('127.0.0.1',9000))
    #     sk.send(b'hello')
    #     time.sleep(1)
    #     print(sk.recv(1024))
    #     sk.close()
    #
    # for i in range(20):
    #     threading.Thread(target=func).start()
    
    非阻塞IO示例

       2.图形示例

       3.缺点:

    #1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,
    否则在低配主机下极容易出现卡机情况

    #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,
    而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

    三. IO多路复用

       基本原理是: select/epoll这个function会不断的轮询所负责的所有socket,

            当某个socket有数据到达了,就通知用户进程。

       1.图形示例

      强调:

        1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking 

    IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。     2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,

    但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,

    而不是被socket IO给block。

       2, python中的 select模块

    import select
    
    fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
    
    参数: 可接受四个参数(前三个必须)
        rlist: wait until ready for reading  #等待读的对象,你需要监听的需要获取数据的对象列表
        wlist: wait until ready for writing  #等待写的对象,你需要写一些内容的时候,input等等,
        也就是说我会循环他看看是否有需要发送的消息,如果有我取出这个对象的消息并发送出去,一般用不到,这里我们也给一个[]。
    xlist: wait for an “exceptional condition” #等待异常的对象,一些额外的情况,一般用不到,但是必须传,那么我们就给他一个[]。 timeout: 超时时间 当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。 返回值:三个列表与上面的三个参数列表是对应的   select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表 1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中 2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中 3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中 4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化

       3.代码示例

    #服务端
    from socket import *
    import select
    server = socket(AF_INET, SOCK_STREAM)
    server.bind(('127.0.0.1',8093))
    server.listen(5)
    # 设置为非阻塞
    server.setblocking(False)
    
    # 初始化将服务端socket对象加入监听列表,后面还要动态添加一些conn连接对象,
    当accept的时候sk就有感应,当recv的时候conn就有动静
    rlist=[server,] rdata = {} #存放客户端发送过来的消息 wlist=[] #等待写对象 wdata={} #存放要返回给客户端的消息 print('预备!监听!!!') count = 0 #写着计数用的,为了看实验效果用的,没用 while True: # 开始 select 监听,对rlist中的服务端server进行监听,select函数阻塞进程,直到rlist中的套接字被触发
    (在此例中,套接字接收到客户端发来的握手信号,从而变得可读,满足select函数的“可读”条件),被触发的(有动静的)
    套接字(服务器套接字)返回给了rl这个返回值里面;
    rl,wl,xl=select.select(rlist,wlist,[],0.5) print('%s 次数>>'%(count),wl) count = count + 1 # 对rl进行循环判断是否有客户端连接进来,当有客户端连接进来时select将触发 for sock in rl: # 判断当前触发的是不是socket对象, 当触发的对象是socket对象时,说明有新客户端accept连接进来了 if sock == server: # 接收客户端的连接, 获取客户端对象和客户端地址信息 conn,addr=sock.accept() #把新的客户端连接加入到监听列表中,当客户端的连接有接收消息的时候,select将被触发,
    会知道这个连接有动静,有消息,那么返回给rl这个返回值列表里面。
    rlist.append(conn) else: # 由于客户端连接进来时socket接收客户端连接请求,将客户端连接加入到了监听列表中(rlist),
    客户端发送消息的时候这个连接将触发
    # 所以判断是否是客户端连接对象触发 try: data=sock.recv(1024) #没有数据的时候,我们将这个连接关闭掉,并从监听列表中移除 if not data: sock.close() rlist.remove(sock) continue print("received {0} from client {1}".format(data.decode(), sock)) #将接受到的客户端的消息保存下来 rdata[sock] = data.decode() #将客户端连接对象和这个对象接收到的消息加工成返回消息,并添加到wdata这个字典里面 wdata[sock]=data.upper() #需要给这个客户端回复消息的时候,我们将这个连接添加到wlist写监听列表中 wlist.append(sock) #如果这个连接出错了,客户端暴力断开了(注意,我还没有接收他的消息,或者接收他的消息的过程中出错了) except Exception: #关闭这个连接 sock.close() #在监听列表中将他移除,因为不管什么原因,它毕竟是断开了,没必要再监听它了 rlist.remove(sock) # 如果现在没有客户端请求连接,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息 for sock in wl: sock.send(wdata[sock]) wlist.remove(sock) wdata.pop(sock) # #将一次select监听列表中有接收数据的conn对象所接收到的消息打印一下 # for k,v in rdata.items(): # print(k,'发来的消息是:',v) # #清空接收到的消息 # rdata.clear() --------------------------------------- #客户端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8093)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close() select网络IO模型的示例代码

       4.优缺点

      ①优点:

    #相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,

    不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,

    这个模型有一定的参考价值。

       ②缺点:

    #首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,
      select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,
      如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,
      类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的
      接口实现具有较好跨平台能力的服务器会比较困难。

    #其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
  • 相关阅读:
    js 文件的操作
    js重点基础知识 以及小案例_最简单的轮播图 简单的动态表格( encodeURIComponent()编码比 encodeURI()编码)
    2阶——数据库连接池 c3p0 , druid, dbcp (其实所有的连接池都实现了dataSource接口,就可以调用getconnection方法)
    2阶——JDBC,JDBCTemplate(操作数据库)
    vue + django 批量删除
    简单的模糊搜索 Vue + django
    vue 父子组件传参简单的分页
    vue 多对多反序列化上传图片
    模型里的 抽象类 表继承
    django 多对多反序列添加
  • 原文地址:https://www.cnblogs.com/sc-1067178406/p/10513450.html
Copyright © 2011-2022 走看看