zoukankan      html  css  js  c++  java
  • 协程IO多路复用

    协程:单线程下实现并发
    并发:伪并行,遇到IO就切换,单核下多个任务之间切换执行,给你的效果就是貌似你的几个程序在同时执行.提高效率
    任务切换 + 保存状态
    并行:多核cpu,真正的同时执行
    串行:一个任务执行完在执行另外一个任务

    多线程多进程下的任务切换+保存状态是操作系统

    协程:单线程下实现并发,最大化线程的效率,检测IO并自动切换,程序级别的任务切换,
    之前多线程多进程都是系统级别的切换,程序级别的切换比系统要快很多

    greenlet(鬼木雷特) 任务切换 + 保存状态,没有实现IO自动切换,
    switch(思维吃)
      #真正的协程模块就是使用greenlet完成的切换
    import time
    from greenlet import greenlet
    
    def eat(name):
        print('%s eat 1' %name)  #2
        time.sleep(3)
        g2.switch('taibai')   #3
        print('%s eat 2' %name) #6
        g2.switch() #7
    def play(name):
        print('%s play 1' %name) #4
        time.sleep(3)
        g1.switch()      #5
        print('%s play 2' %name) #8
    
    g1=greenlet(eat)
    g2=greenlet(play)
    
    g1.switch('taibai')#可以在第一次switch时传入参数,以后都不需要
    

    分析:单纯的协程没有意义,反而可能会让性能降低,那么协程的存在意义在哪里呢?结合上面单线程实现并发的示例,思考一下假如当我们执行了一段代码后遇到IO操作,此时我们不再等待,而是切换到另一段代码去执行,然后遇到IO操作的时候再去切换,这样是不是也能提高性能,实现并发,但是greenlet只能做协程,不能实现遇到IO就切换,所以协程如果再加上遇到IO就切换,那么便能实现单线程并发了。那么谁能做到遇到IO就切换呢?那就是另外一个模块geven,安装方法:pip3 install gevent。

            gevent内部要依赖greenlet,也就是greenlet + IO切换,所以gevent就牛逼了!写法如下:

    gevent(这森特)

    任务切换 + 保存状态,实现了IO自动切换,并且通过monkey 能够识别到基本上所有的IO操作.

    import gevent
    from gevent import monkey;monkey.patch_all()    monkey(猴子,忙k)   patch(怕吃)ang
    import time
    
    def eat(name):
        print('%s eat 1' %name)
        # gevent.sleep(2)
        time.sleep(2)
    
        print('%s eat 2' %name)
    
    def play(name):
        print('%s play 1' %name)
        # gevent.sleep(2)
        time.sleep(2)#需要引入模块才可以用time
        print('%s play 2' %name)
    
    g1=gevent.spawn(eat,'egon') #异步执行这个eat任务,后面egon就是给他传的参数    #spawn(四棒的)
    g2=gevent.spawn(play,name='egon')
    
    gevent.joinall([g1,g2])  
    
    print('主')
    

     上面通过gevent实现了单线程并发,提高了效率,通过对比,我们发现,上面IO多路复用的示例中是一个线程在不停的执行,而是gevent是在代码间进行切换,虽然原理不行,但是都提高了效率,实现单线程并发。

    总结:

           1、协程可以提高并发吗?

                  协程自己本身无法实现并发,甚至性能会降低,而协程+IO切换性能就可以提高了。

           2、单线程提高并发的方法有哪些?

                  a、协程+遇到就IO切换:gevent;   注意:不是异步,无回调函数,但本质也是基于事件循环

                  b、基于时间循环的异步非阻塞框架:Twisted;

           3、线程、进程、协程的区别?

        进程cpu资源分配的最小单元,主要用来做数据隔离,那么线程是cpu工作的最小单元,一个应用程序可以有多个进程(默认有一个),一个进程可以有多个线程(默认有一个),这是它们的一个简单区别;基本上在其他语言中没有进程这个概念,大都用线程,而在python中由于有GIL锁,它保证了同一时刻一个进程中只能有一个线程被cpu调度,为了利用多核优势就要创建多个进程,多线程没有用,所以计算密集型的用多进程,IO密集型的用多线程就行,因为IO操作不占用CPU。而协程是程序员人为创造出来的不真实存在的,它可以让程序员控制代码执行顺序,在函数之间来回切换,本身协程存在没有意义,但是能跟IO切换放在一起就厉害了,相当于将线程切片,程序遇到IO就切换到其他代码,IO完成后再切回来,达到让线程不停去工作的效果,实现协程的模块是greenlet,实现协程+IO切换的模块是gevent,这就是三者的区别。

     

    IO多路复用 

    1. 阻塞IO模型:

    平常写的都是recv,send都是阻塞

    所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

      这里我们回顾一下同步/异步/阻塞/非阻塞:

        同步:提交一个任务之后要等待这个任务执行完毕

        异步:只管提交任务,不等待这个任务执行完毕就可以去做其他的事情

        阻塞:recv、recvfrom、accept,线程阶段  运行状态-->阻塞状态-->就绪

        非阻塞:没有阻塞状态

    2.非阻塞IO模型

    服务端

    setblocking(赛特老跟)

    import socket
    import time
    
    server=socket.socket()
    server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8083))
    server.listen(5)
    print('你看看卡在哪')
    server.setblocking(False)
    rlist = []
    rl = []
    while 1:
        try:
            conn, addr = server.accept()
            print(addr)
            rlist.append(conn)
            print('来自%s:%s的链接请求'%(addr[0],addr[1]))
        except BlockingIOError:
            print('去买点药')
    
        # time.sleep(0.1)
        print('rlist',rlist,len(rlist))
        for con in rlist:
            try:
                from_client_msg = con.recv(1024)
            except BlockingIOError:
                continue
            except ConnectionResetError:
                con.close()
                rl.append(con)
        print('>>>>',rl)
        for remove_con in rl:
            rlist.remove(remove_con)
        rl.clear()
    非阻塞IO的socket
    import socket
    import time
    
    server=socket.socket()
    server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8083))
    server.listen(5)
    
    server.setblocking(False) #设置不阻塞
    r_list=[]  #用来存储所有来请求server端的conn连接
    w_list={}  #用来存储所有已经有了请求数据的conn的请求数据
    
    while 1:
        try:
            conn,addr=server.accept() #不阻塞,会报错
            r_list.append(conn)  #为了将连接保存起来,不然下次循环的时候,上一次的连接就没有了
        except BlockingIOError:
            # 强调强调强调:!!!非阻塞IO的精髓在于完全没有阻塞!!!
            # time.sleep(0.5) # 打开该行注释纯属为了方便查看效果
            print('在做其他的事情')
            # print('rlist: ',len(r_list))
            # print('wlist: ',len(w_list))
    
    
            # 遍历读列表,依次取出套接字读取内容
            del_rlist=[] #用来存储删除的conn连接
            for conn in r_list:
    
                try:
                    data=conn.recv(1024) #不阻塞,会报错
                    if not data: #当一个客户端暴力关闭的时候,会一直接收b'',别忘了判断一下数据
                        conn.close()
                        del_rlist.append(conn)
                        continue
                    w_list[conn]=data.upper()
    
    
                except BlockingIOError: # 没有收成功,则继续检索下一个套接字的接收
                    continue
                except ConnectionResetError: # 当前套接字出异常,则关闭,然后加入删除列表,等待被清除
                    conn.close()
                    del_rlist.append(conn)
    
    
            # 遍历写列表,依次取出套接字发送内容
            del_wlist=[]
            for conn,data in w_list.items():
                try:
                    conn.send(data)
                    del_wlist.append(conn)
                except BlockingIOError:
                    continue
    
    
            # 清理无用的套接字,无需再监听它们的IO操作
            for conn in del_rlist:
                r_list.remove(conn)
            #del_rlist.clear() #清空列表中保存的已经删除的内容
            for conn in del_wlist:
                w_list.pop(conn)
            #del_wlist.clear()
    
    
    
    
    
    
    
    ################################################################################
    #作业讲解
    
    from threading import Thread
    import socket
    
    class MyServer():
    
        def __init__(self,ip_port):
            # super().__init__()
            self.ip_port = ip_port
            self.qianxi()
        def qianxi(self):
            self.socket = socket.socket()
            self.socket.bind(self.ip_port)
            self.socket.listen()
            self.run()
        def run(self):
            while 1:
                self.conn, self.addr = self.socket.accept()
                self.thread_start(self.conn)
    
        def thread_start(self,conn):
            t = Thread(target=self.recv_data,args=(conn,))
            t.start()
            return self.conn
        def recv_data(self,conn):
            from_client_msg = conn.recv(1024).decode('utf-8')
            print('>>>>>',from_client_msg)
            conn.send('来玩啊,帅哥!'.encode('utf-8'))
    
    if __name__ == '__main__':
        ip_port = ('127.0.0.1',8001)
        MyServer(ip_port,)
    完整版非阻塞IO模型服务端

    客户端

    import socket
    import time
    
    ip_port = ('127.0.0.1',8083)
    
    client = socket.socket()
    
    client.connect(ip_port)
    
    while 1:
    
        client.send(b'dayangge henweisuo ')
        time.sleep(0.1)
    非阻塞IO的socket

    多路复用IO

    import select
    
    fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
    
    参数: 可接受四个参数(前三个必须)
        rlist: wait until ready for reading  #等待读的对象,你需要监听的需要获取数据的对象列表
        wlist: wait until ready for writing  #等待写的对象,你需要写一些内容的时候,input等等,也就是说我会循环他看看是否有需要发送的消息,如果有我取出这个对象的消息并发送出去,一般用不到,这里我们也给一个[]。
        xlist: wait for an “exceptional condition”  #等待异常的对象,一些额外的情况,一般用不到,但是必须传,那么我们就给他一个[]。
        timeout: 超时时间
        当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
    返回值:三个列表与上面的三个参数列表是对应的
      select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
        1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
        2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
        3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
        4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化
    

     

     select(色来可特)模块

     setblocking(赛特老跟)

        

    #服务端
    from socket import *
    import select
    server = socket(AF_INET, SOCK_STREAM)
    server.bind(('127.0.0.1',8093))
    server.listen(5)
    # 设置为非阻塞
    server.setblocking(False)
    
    # 初始化将服务端socket对象加入监听列表,后面还要动态添加一些conn连接对象,当accept的时候sk就有感应,当recv的时候conn就有动静
    rlist=[server,]
    rdata = {}  #存放客户端发送过来的消息
    
    wlist=[]  #等待写对象
    wdata={}  #存放要返回给客户端的消息
    
    print('预备!监听!!!')
    count = 0 #写着计数用的,为了看实验效果用的,没用
    while True:
        # 开始 select 监听,对rlist中的服务端server进行监听,select函数阻塞进程,直到rlist中的套接字被触发(在此例中,套接字接收到客户端发来的握手信号,从而变得可读,满足select函数的“可读”条件),被触发的(有动静的)套接字(服务器套接字)返回给了rl这个返回值里面;
        rl,wl,xl=select.select(rlist,wlist,[],0.5)
        print('%s 次数>>'%(count),wl)
        count = count + 1
        # 对rl进行循环判断是否有客户端连接进来,当有客户端连接进来时select将触发
        for sock in rl:
            # 判断当前触发的是不是socket对象, 当触发的对象是socket对象时,说明有新客户端accept连接进来了
            if sock == server:
                # 接收客户端的连接, 获取客户端对象和客户端地址信息
                conn,addr=sock.accept()
                #把新的客户端连接加入到监听列表中,当客户端的连接有接收消息的时候,select将被触发,会知道这个连接有动静,有消息,那么返回给rl这个返回值列表里面。
                rlist.append(conn)
            else:
                # 由于客户端连接进来时socket接收客户端连接请求,将客户端连接加入到了监听列表中(rlist),客户端发送消息的时候这个连接将触发
                # 所以判断是否是客户端连接对象触发
                try:
                    data=sock.recv(1024)
                    #没有数据的时候,我们将这个连接关闭掉,并从监听列表中移除
                    if not data:
                        sock.close()
                        rlist.remove(sock)
                        continue
                    print("received {0} from client {1}".format(data.decode(), sock))
                    #将接受到的客户端的消息保存下来
                    rdata[sock] = data.decode()
    
                    #将客户端连接对象和这个对象接收到的消息加工成返回消息,并添加到wdata这个字典里面
                    wdata[sock]=data.upper()
                    #需要给这个客户端回复消息的时候,我们将这个连接添加到wlist写监听列表中
                    wlist.append(sock)
                #如果这个连接出错了,客户端暴力断开了(注意,我还没有接收他的消息,或者接收他的消息的过程中出错了)
                except Exception:
                    #关闭这个连接
                    sock.close()
                    #在监听列表中将他移除,因为不管什么原因,它毕竟是断开了,没必要再监听它了
                    rlist.remove(sock)
        # 如果现在没有客户端请求连接,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息
        for sock in wl:
            sock.send(wdata[sock])
            wlist.remove(sock)
            wdata.pop(sock)
    
        # #将一次select监听列表中有接收数据的conn对象所接收到的消息打印一下
        # for k,v in rdata.items():
        #     print(k,'发来的消息是:',v)
        # #清空接收到的消息
        # rdata.clear()
    
    ---------------------------------------
    #客户端
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8093))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()
    

    基于IO多路复用+socket实现单线程并发

    # ################ 解决并发:单线程+IO不等待 ################
      import socket
      import select
    
      client1 = socket.socket()
      client1.setblocking(False) # 将原来阻塞的位置变成非阻塞(报错)
      try:
          client1.connect(('www.baidu.com',80))
      except BlockingIOError as e:
          pass
    
      client2 = socket.socket()
      client2.setblocking(False)  # 将原来阻塞的位置变成非阻塞(报错)
      try:
          client2.connect(('www.sogou.com',80))
      except BlockingIOError as e:
          pass
    
      client3 = socket.socket()
      client3.setblocking(False)  # 将原来阻塞的位置变成非阻塞(报错)
      try:
          client3.connect(('www.sina.com.cn',80))
      except BlockingIOError as e:
          pass
    
      socket_list = [client1,client2,client3]
      conn_list = [client1,client2,client3]
    
      while True:
          rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
          # rlist中表示已经接收到数据的socket对象   
          # wlist中表示已经连接成功的socket对象
          for sk in wlist:
              if sk == client1:
                  sk.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
              elif sk == client2:
                  sk.sendall(b'GET /web?query=fdf HTTP/1.0
    host:www.sogou.com
    
    ')
              else:
                  sk.sendall(b'GET /mid/search.shtml?q=alex HTTP/1.0
    host:www.sina.com.cn
    
    ')
              conn_list.remove(sk)
          for sk in rlist:
              chunk_list = []
              while True:
                  try:
                      chunk = sk.recv(8096)
                      if not chunk:
                          break
                      chunk_list.append(chunk)
                  except BlockingIOError as e:
                      break
              body = b''.join(chunk_list)
              print('------------>',body)
              sk.close()
              socket_list.remove(sk)
          if not socket_list:
              break
    记着看

    上面示例可以进行封装,但是封装前先来看这样两段代码:

        # 代码一:
        v = [
            [11,22], # 每个都有一个append方法
            [22,33], # 每个都有一个append方法
            [33,44], # 每个都有一个append方法
        ]
        for item in v:
            print(item.append)
    

      

        # 代码二(为了不改变for循环代码,可以进行如下封装)
        class Foo(object):
            def __init__(self,data):
                self.row = data
    
            def append(self,item):
                self.row.append(item)
        v = [
            Foo([11,22]), # 每个都有一个append方法
            Foo([22,33]), # 每个都有一个append方法
            Foo([33,44]), # 每个都有一个append方法
        ]
    
        for item in v:
            print(item.append)
    

      

    # ############## 单线程并发高级版:封装上面示例 ##############
        import socket
        import select
    
        class Req(object):
            def __init__(self,sk,func):
                self.sock = sk
                self.func = func
    
            def fileno(self):
                return self.sock.fileno()
    
        class Nb(object):
            def __init__(self):
                self.conn_list = []
                self.socket_list = []
    
            def add(self,url,func):
                client = socket.socket()
                client.setblocking(False)  # 非阻塞
                try:
                    client.connect((url, 80))
                except BlockingIOError as e:
                    pass
                obj = Req(client,func)
                self.conn_list.append(obj)
                self.socket_list.append(obj)
    
            def run(self):
                while True:
                    rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005)
                    for sk in wlist:
                        # 发生变换的req对象
                        sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
                        self.conn_list.remove(sk)
                    for sk in rlist:
                        chunk_list = []
                        while True:
                            try:
                                chunk = sk.sock.recv(8096)
                                if not chunk:
                                    break
                                chunk_list.append(chunk)
                            except BlockingIOError as e:
                                break
                        body = b''.join(chunk_list)
                        sk.func(body)
                        sk.sock.close()
                        self.socket_list.remove(sk)
                    if not self.socket_list:
                        break
    
        def baidu_repsonse(body):
            print('百度下载结果:',body)
    
        def sogou_repsonse(body):
            print('搜狗下载结果:', body)
    
        def sina_repsonse(body):
            print('新浪下载结果:', body)
    
        t1 = Nb()
        t1.add('www.baidu.com',baidu_repsonse)
        t1.add('www.sogou.com',sogou_repsonse)
        t1.add('www.sina.com.cn',sina_repsonse)
        t1.run()
    封装版

    总结:

           1、socket默认是否是阻塞的?阻塞体现在哪里?

                  是,体现在等待连接和等待接收数据。

           2、如何让socket编程非阻塞?

                  通过设置client.setblocking(False)

           3、IO多路复用作用?

                  检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可写/可读)

                  操作系统检测socket是否发生变化,有三种模式:

                         select:最多1024个socket,循环去检测;

                         poll:不限制监听socket个数,循环去检测(水平触发);

                         epoll:不限制监听socket个数,回调方式(边缘触发);

                  Python模块:

                         select.select

                         select.epoll(windows不支持,linux中可以用)

           4、提高并发方案:

                  - 多进程

                  - 多线程

                  - 异步非阻塞模块(Twisted), 爬虫中学的scrapy框架(内部是用单线程完成并发)

           5、什么是异步非阻塞?

                  - 非阻塞,不等待。

                         比如创建socket对某个地址进行connect、获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作。

                         如果设置setblocking(False),以上两个过程就不再等待,但是会报BlockingIOError的错误,只要捕获即可。

                  - 异步,通知,执行完成之后自动执行回调函数或自动执行某些操作(通知)。

                         比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自动执行回调函数。

           6、什么是同步阻塞?

                  - 阻塞:等

                  - 同步:按照顺序逐步执行

  • 相关阅读:
    Java标签实现分页
    Servlet过滤器介绍之原理分析
    java超强分页标签演示
    Mybatis中resultMap与resultType区别
    洛谷 P1002 过河卒
    洛谷 P2181 对角线
    ACM常数优化
    2020 年百度之星·程序设计大赛
    AtCoder Beginner Contest 171 E
    AtCoder Beginner Contest 171 D
  • 原文地址:https://www.cnblogs.com/xihuanniya/p/9877437.html
Copyright © 2011-2022 走看看