zoukankan      html  css  js  c++  java
  • python-day35(协程,IO多路复用)

    一. 协程

      协程: 是单线程下的并发,又称微线程,纤程,英文名Coroutine. 

      并发: 切换+保存状态

        协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的

      协程特点:

        1. 必须在只有一个单线程里实现并发

        2. 修改共享数据不需要加锁

        3. 用户程序自己保存多个控制流的上线文栈

        4. 一个协程遇到IO操作自动切换到其他协程(如何实现检测IO,yield,greenlet都无法实现,

          就用到了gevent模块(select机制))

      Greenlet模块

        单纯的切换(在没有IO的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度

      #真正的协程模块就是使用greenlet完成的切换
    from greenlet import greenlet
    
    def eat(name):
        print('%s eat 1' %name)  #2
        g2.switch('taibai')   #3
        print('%s eat 2' %name) #6
        g2.switch() #7
    def play(name):
        print('%s play 1' %name) #4
        g1.switch()      #5
        print('%s play 2' %name) #8
    
    g1=greenlet(eat)
    g2=greenlet(play)
    
    g1.switch('taibai')#可以在第一次switch时传入参数,以后都不需要  1
    #安装
    pip install greenlet
     1 # import time
     2 #
     3 # import greenlet
     4 # def fun1():
     5 #     time.sleep(2)
     6 #     print('约吗?')
     7 #     g2.switch()
     8 #     time.sleep(2)
     9 #     print('不约')
    10 #     g2.switch()
    11 # def fun2():
    12 #     time.sleep(2)
    13 #     print('你好')
    14 #     g1.switch()
    15 #     time.sleep(2)
    16 #     print('你不好')
    17 #
    18 # g1 = greenlet.greenlet(fun1)
    19 # g2 = greenlet.greenlet(fun2)
    20 # g1.switch()
    21 # ----------------------------------------------------
    22 # import time
    23 # import greenlet
    24 # def f1(name):
    25 #     print(name)
    26 #     time.sleep(2)
    27 #     a = g2.switch(name+'1')
    28 #     print(a)
    29 #
    30 # def f2(name):
    31 #     print(name)
    32 #     time.sleep(2)
    33 #     g1.switch(name+'1')
    34 #
    35 # a = time.time()
    36 # g1 = greenlet.greenlet(f1)
    37 # g2 = greenlet.greenlet(f2)
    38 # g1.switch('hui')
    39 # b = time.time()
    40 # print(b-a)
    greenlet练习

      Gevent模块

    #安装
    pip install gevent
    #用法
    g1 = gevent.spawn(func,多个参数) 创建一个协程对象g1, spawn(函数名,多个参数)
    spawn是异步提交任务
    
    g2 = gevent.spawn(func2)
    
    g1.join() #等待g1结束,上面只是创建协程对象,这个join才是去执行
    g2.join() #等待g2结束,

    gevent.joinall([g1,g2]) #等待列表中的协程都结束
    g1.value #拿到func1的返回值
    gevent.sleep(2)# 睡2秒 遇到IO切换
    # 不能识别time.sleep 解决办法 from gevent import monkey;monkey.patch_all()

      遇到IO阻塞是自动切换任务

    import gevent
    def eat(name):
        print('%s eat 1' %name)
        gevent.sleep(2)
        print('%s eat 2' %name)
    
    def play(name):
        print('%s play 1' %name)
        gevent.sleep(1)
        print('%s play 2' %name)
    
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,name='egon')
    g1.join()
    g2.join()
    #或者gevent.joinall([g1,g2])
    print('')

      gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

      而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要下面一行的代码,打补丁,就可以识别

      from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

      或者直接记忆成: 要用gevent, 需要将from gevent import monkey;monkey.patch_all()放到文件的开头

     1 # import gevent
     2 # import time
     3 # #gevent 不能识别 time.sleep
     4 # # 解决办法 from gevent import monkey;monkey.patch_all() 识别所有的IO 自动切换
     5 #
     6 # def func(n,):
     7 #     print('xxxx',n)
     8 #     gevent.sleep(2)
     9 #     # time.sleep(2) #不能识别
    10 #     print('ccccc')
    11 #
    12 # def func2(m):
    13 #     print('11111111',m)
    14 #     gevent.sleep(2)
    15 #     print('22222222')
    16 # g1 = gevent.spawn(func,'alex')  #  异步提交任务
    17 # g2 = gevent.spawn(func2,'马来西亚')
    18 #
    19 # # gevent.joinall([g1,g2]) #等多所有
    20 # g1.join() #执行并等待异步提交的任务完成
    21 # g2.join()#执行并等待异步提交的任务完成
    22 # print('代码结束')
    from gevent import monkey;monkey.patch_all()

    二. IO多路复用

      IO模型

    #1)  等待数据准备(Waiting for the data to be ready)
    #2)  将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
    #1、输入操作:read、readv、recv、recvfrom、recvmsg共5个函数,如果会阻塞状态,
      则会经理wait data和copy data两个阶段,如果设置为非阻塞则在wait 不到data时抛出异常 #2、输出操作:write、writev、send、sendto、sendmsg共5个函数,
      在发送缓冲区满了会阻塞在原地,如果设置为非阻塞,则会抛出异常 #3、接收外来链接:accept,与输入操作类似 #4、发起外出链接:connect,与输出操作类似
     

      1. 阻塞IO

      

    blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

      这里我们回顾一下同步/异步/阻塞/非阻塞:

        同步:提交一个任务之后要等待这个任务执行完毕

        异步:只管提交任务,不等待这个任务执行完毕就可以去做其他的事情

        阻塞:recv、recvfrom、accept,线程阶段  运行状态-->阻塞状态-->就绪

        非阻塞:没有阻塞状态

      在一个线程的IO模型中,我们recv的地方阻塞,我们就开启多线程,但是不管你开启多少个线程,'

      这个recv的时间是不是没有被规避掉,不管是多线程还是多进程都没有规避掉这个IO时间。

      2. 非阻塞IO模型

        

        非阻塞IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有

     1 # 服务端
     2 import socket
     3 import time
     4 
     5 server=socket.socket()
     6 server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
     7 server.bind(('127.0.0.1',8083))
     8 server.listen(5)
     9 
    10 server.setblocking(False) #设置不阻塞
    11 r_list=[]  #用来存储所有来请求server端的conn连接
    12 w_list={}  #用来存储所有已经有了请求数据的conn的请求数据
    13 
    14 while 1:
    15     try:
    16         conn,addr=server.accept() #不阻塞,会报错
    17         r_list.append(conn)  #为了将连接保存起来,不然下次循环的时候,上一次的连接就没有了
    18     except BlockingIOError:
    19         # 强调强调强调:!!!非阻塞IO的精髓在于完全没有阻塞!!!
    20         # time.sleep(0.5) # 打开该行注释纯属为了方便查看效果
    21         print('在做其他的事情')
    22         print('rlist: ',len(r_list))
    23         print('wlist: ',len(w_list))
    24 
    25 
    26         # 遍历读列表,依次取出套接字读取内容
    27         del_rlist=[] #用来存储删除的conn连接
    28         for conn in r_list:
    29             try:
    30                 data=conn.recv(1024) #不阻塞,会报错
    31                 if not data: #当一个客户端暴力关闭的时候,会一直接收b'',别忘了判断一下数据
    32                     conn.close()
    33                     del_rlist.append(conn)
    34                     continue
    35                 w_list[conn]=data.upper()
    36             except BlockingIOError: # 没有收成功,则继续检索下一个套接字的接收
    37                 continue
    38             except ConnectionResetError: # 当前套接字出异常,则关闭,然后加入删除列表,等待被清除
    39                 conn.close()
    40                 del_rlist.append(conn)
    41 
    42 
    43         # 遍历写列表,依次取出套接字发送内容
    44         del_wlist=[]
    45         for conn,data in w_list.items():
    46             try:
    47                 conn.send(data)
    48                 del_wlist.append(conn)
    49             except BlockingIOError:
    50                 continue
    51 
    52 
    53         # 清理无用的套接字,无需再监听它们的IO操作
    54         for conn in del_rlist:
    55             r_list.remove(conn)
    56         #del_rlist.clear() #清空列表中保存的已经删除的内容
    57         for conn in del_wlist:
    58             w_list.pop(conn)
    59         #del_wlist.clear()
    60 
    61 #客户端
    62 import socket
    63 import os
    64 import time
    65 import threading
    66 client=socket.socket()
    67 client.connect(('127.0.0.1',8083))
    68 
    69 while 1:
    70     res=('%s hello' %os.getpid()).encode('utf-8')
    71     client.send(res)
    72     data=client.recv(1024)
    73 
    74     print(data.decode('utf-8'))
    75 
    76 
    77 ##多线程的客户端请求版本
    78 # def func():
    79 #     sk = socket.socket()
    80 #     sk.connect(('127.0.0.1',9000))
    81 #     sk.send(b'hello')
    82 #     time.sleep(1)
    83 #     print(sk.recv(1024))
    84 #     sk.close()
    85 #
    86 # for i in range(20):
    87 #     threading.Thread(target=func).start()
    非阻塞io示例

      3. IO多路复用

      

     强调:

        1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,

          可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

        2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。

          只不过process是被select这个函数block,而不是被socket IO给block。

      python中的select模块:

    import select
    
    fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
    
    参数: 可接受四个参数(前三个必须)
        rlist: wait until ready for reading  #等待读的对象,你需要监听的需要获取数据的对象列表
        wlist: wait until ready for writing  #等待写的对象,你需要写一些内容的时候,input等等,也就是说我会循环他看看是否有需要发送的消息,如果有我取出这个对象的消息并发送出去,一般用不到,这里我们也给一个[]。
        xlist: wait for an “exceptional condition”  #等待异常的对象,一些额外的情况,一般用不到,但是必须传,那么我们就给他一个[]。
        timeout: 超时时间
        当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
    返回值:三个列表与上面的三个参数列表是对应的
      select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
        1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
        2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
        3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
        4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化
     1 #服务端
     2 from socket import *
     3 import select
     4 server = socket(AF_INET, SOCK_STREAM)
     5 server.bind(('127.0.0.1',8093))
     6 server.listen(5)
     7 # 设置为非阻塞
     8 server.setblocking(False)
     9 
    10 # 初始化将服务端socket对象加入监听列表,后面还要动态添加一些conn连接对象,当accept的时候sk就有感应,当recv的时候conn就有动静
    11 rlist=[server,]
    12 rdata = {}  #存放客户端发送过来的消息
    13 
    14 wlist=[]  #等待写对象
    15 wdata={}  #存放要返回给客户端的消息
    16 
    17 print('预备!监听!!!')
    18 count = 0 #写着计数用的,为了看实验效果用的,没用
    19 while True:
    20     # 开始 select 监听,对rlist中的服务端server进行监听,select函数阻塞进程,直到rlist中的套接字被触发(在此例中,套接字接收到客户端发来的握手信号,从而变得可读,满足select函数的“可读”条件),被触发的(有动静的)套接字(服务器套接字)返回给了rl这个返回值里面;
    21     rl,wl,xl=select.select(rlist,wlist,[],0.5)
    22     print('%s 次数>>'%(count),wl)
    23     count = count + 1
    24     # 对rl进行循环判断是否有客户端连接进来,当有客户端连接进来时select将触发
    25     for sock in rl:
    26         # 判断当前触发的是不是socket对象, 当触发的对象是socket对象时,说明有新客户端accept连接进来了
    27         if sock == server:
    28             # 接收客户端的连接, 获取客户端对象和客户端地址信息
    29             conn,addr=sock.accept()
    30             #把新的客户端连接加入到监听列表中,当客户端的连接有接收消息的时候,select将被触发,会知道这个连接有动静,有消息,那么返回给rl这个返回值列表里面。
    31             rlist.append(conn)
    32         else:
    33             # 由于客户端连接进来时socket接收客户端连接请求,将客户端连接加入到了监听列表中(rlist),客户端发送消息的时候这个连接将触发
    34             # 所以判断是否是客户端连接对象触发
    35             try:
    36                 data=sock.recv(1024)
    37                 #没有数据的时候,我们将这个连接关闭掉,并从监听列表中移除
    38                 if not data:
    39                     sock.close()
    40                     rlist.remove(sock)
    41                     continue
    42                 print("received {0} from client {1}".format(data.decode(), sock))
    43                 #将接受到的客户端的消息保存下来
    44                 rdata[sock] = data.decode()
    45 
    46                 #将客户端连接对象和这个对象接收到的消息加工成返回消息,并添加到wdata这个字典里面
    47                 wdata[sock]=data.upper()
    48                 #需要给这个客户端回复消息的时候,我们将这个连接添加到wlist写监听列表中
    49                 wlist.append(sock)
    50             #如果这个连接出错了,客户端暴力断开了(注意,我还没有接收他的消息,或者接收他的消息的过程中出错了)
    51             except Exception:
    52                 #关闭这个连接
    53                 sock.close()
    54                 #在监听列表中将他移除,因为不管什么原因,它毕竟是断开了,没必要再监听它了
    55                 rlist.remove(sock)
    56     # 如果现在没有客户端请求连接,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息
    57     for sock in wl:
    58         sock.send(wdata[sock])
    59         wlist.remove(sock)
    60         wdata.pop(sock)
    61 
    62     # #将一次select监听列表中有接收数据的conn对象所接收到的消息打印一下
    63     # for k,v in rdata.items():
    64     #     print(k,'发来的消息是:',v)
    65     # #清空接收到的消息
    66     # rdata.clear()
    67 
    68 ---------------------------------------
    69 #客户端
    70 from socket import *
    71 
    72 client=socket(AF_INET,SOCK_STREAM)
    73 client.connect(('127.0.0.1',8093))
    74 
    75 
    76 while True:
    77     msg=input('>>: ').strip()
    78     if not msg:continue
    79     client.send(msg.encode('utf-8'))
    80     data=client.recv(1024)
    81     print(data.decode('utf-8'))
    82 
    83 client.close()
    select网络IO模型的示例代码

      4. 异步IO

      貌似异步IO这个模型很牛~~但是你发现没有,这不是我们自己代码控制的,都是操作系统完成的,而python在copy数据这个阶段没有提供操纵操作系统的接口,所以用python没法实现这套异步IO机制,其他几个IO模型都没有解决第二阶段的阻塞(用户态和内核态之间copy数据),但是C语言是可以实现的,因为大家都知道C语言是最接近底层的,虽然我们用python实现不了,但是python仍然有异步的模块和框架(tornado、twstied,高并发需求的时候用),这些模块和框架很多都是用底层的C语言实现的,它帮我们实现了异步,你只要使用就可以了,但是你要知道这个异步是不是很好呀,不需要你自己等待了,操作系统帮你做了所有的事情,你就直接收数据就行了,就像你有一张银行卡,银行定期给你打钱一样。

     >>>>>>>>更多协程

    >>>>>>>>更多IO操作

  • 相关阅读:
    ubuntu下安装pip
    [算法]获得最短路径的Floyd与Dijkstra算法
    win2003终端服务授权
    Cookie 读取,解决中文乱码
    MOSS自动备份
    MOSS 开发收藏
    Private Protect Partial Internal Public 区别
    怎么设置OUTLOOK接收邮件时,网站邮箱的原始文件也保存着?
    正则表达式实战
    SQL Server 2005 数据库用户和登录帐户设置关链
  • 原文地址:https://www.cnblogs.com/Thui/p/10065578.html
Copyright © 2011-2022 走看看