zoukankan      html  css  js  c++  java
  • 协程和异步io

    一. 并发、并行、同步、异步、阻塞、非阻塞

      1.并发:是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机上运行。

      2.并行:是指任何时间点,有多个程序运行在多个CPU上(最多和CPU数量一致)。

      3.并发和并行的区别:

        并发和并行是即相似又有区别的两个概念,并行是指两个或者多个事件在同一时刻发生;而并发是指两个或多个事件在同一时间间隔内发生。在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机系统中,每一时刻却仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。倘若在计算机系统中有多个处理机,则这些可以并发执行的程序便可被分配到多个处理机上,实现并行执行,即利用每个处理机来处理一个可并发执行的程序,这样,多个程序便可以同时执行。

      4.同步:是指代码调用IO操作时,必须等待IO操作完成才能返回的调用方式。

      5.异步:是指代码调用IO操作时,不必等待IO操作完成就能返回的调用方式。

      6.阻塞:是指调用函数的时候当前线程被挂起。

      7.非阻塞:是指调用函数的时候当前线程不会被挂起,而是立即返回。

    二. C10K问题和io多路复用(select、poll、epoll)

      1.C10K问题:

        谓c10k问题,指的是服务器同时支持成千上万个客户端的问题,也就是concurrent 10 000 connection(这也是c10k这个名字的由来)。由于硬件成本的大幅度降低和硬件技术的进步,如果一台服务器同时能够服务更多的客户端,那么也就意味着服务每一个客户端的成本大幅度降低,从这个角度来看,问题显得非常有意义。

      2.五种I/O模型(详情:https://www.cnblogs.com/findumars/p/6361627.html):

        

        5.1阻塞I式/O:系统调用不会立即返回结果,当前线程会阻塞,等到获得结果或报错时在返回(问题:如在调用send()的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。)

        5.2非阻塞式I/O:调用后立即返回结果(问题:不一定三次握手成功,recv() 会被循环调用,循环调用recv()将大幅度推高CPU 占用率),做计算任务或者再次发起其他连接就较有优势

        5.3I/O复用:它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。(阻塞式的方法,可以监听多个socket状态)(问题:将数据从内核复制到用户空间的时间不能省)

        5.4信号驱动式I/O:运用较少

        5.5异步I/O:它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

      3.解决方法(参照:https://blog.csdn.net/wangtaomtk/article/details/51811011):

        3.1每个线程/进程处理一个连接:

          但是由于申请进程/线程会占用相当可观的系统资源,同时对于多进程/线程的管理会对系统造成压力,因此这种方案不具备良好的可扩展性。因此,这一思路在服务器资源还没有富裕到足够程度的时候,是不可行的;即便资源足够富裕,效率也不够高。

          问题:资源占用过多,可扩展性差。

        3.2每个进程/线程同时处理多个连接(IO多路复用):

          3.2.1传统思路

            最简单的方法是循环挨个处理各个连接,每个连接对应一个 socket当所有 socket 都有数据的时候,这种方法是可行的。但是当应用读取某个 socket 的文件数据不 ready 的时候,整个应用会阻塞在这里等待该文件句柄,即使别的文件句柄 ready,也无法往下处理。    

            思路:直接循环处理多个连接。问题:任一文件句柄的不成功会阻塞住整个应用。

          3.2.2select:

           

            思路:有连接请求抵达了再检查处理。

            问题:句柄上限+重复初始化+逐个排查所有文件句柄状态效率不高。

          3.2.3poll

            思路:设计新的数据结构提供使用效率。

            问题:逐个排查所有文件句柄状态效率不高。

          3.2.4epoll(nginx使用的是epoll)

            思路:只返回状态变化的文件句柄。

            问题:依赖特定平台(Linux)。

        注:epoll不一定比select好(在高并发的情况下,连接活跃度不是很高,epoll比select好;在并发性不高,同时连接很活跃select比epoll好(游戏))

    三. epoll+回调+事件循环方式url

      1. 通过非阻塞I/O实现http请求:

     1 import socket
     2 from urllib.parse import urlparse
     3 
     4 def get_url(url):
     5     #通过socket请求html
     6     url=urlparse(url)
     7     host=url.netloc
     8     path=url.path
     9     if path=="":
    10         path="/"
    11     #建立socket连接
    12     client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    13     #设置成非阻塞(抛异常:BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。)
    14     client.setblocking(False)
    15     try:
    16         client.connect((host,80))
    17     except BlockingIOError as e:
    18         pass
    19     #向服务器发送数据(还未连接会抛异常)
    20     while True:
    21         try:
    22             client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(path, host).encode("utf8"))
    23             break
    24         except OSError as e:
    25             pass
    26     #将数据读取完
    27     data=b""
    28     while True:
    29         try:
    30             d=client.recv(1024)
    31         except BlockingIOError as e:
    32             continue
    33         if d:
    34             data+=d
    35         else:
    36             break
    37     #会将header信息作为返回字符串
    38     data=data.decode('utf8')
    39     print(data.split('
    
    ')[1])
    40     client.close()
    41 
    42 if __name__=='__main__':
    43     get_url('http://www.baidu.com')
    View Code

      2.使用select完成http请求(循环回调):

        优点:并发性高(驱动整个程序主要是回调循环loop(),不会等待,请求操作系统有什么准备好了,准备好了就执行【没有线程切换等,只有一个线程,当一个url连接建立完成后就会注册,然后回调执行】,省去了线程切换和内存)

     1 #自动根据环境选择poll和epoll
     2 from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
     3 selector=DefaultSelector()
     4 urls=[]
     5 #全局变量
     6 stop=False
     7 class Fetcher:
     8     def connected(self, key):
     9         #取消注册
    10         selector.unregister(key.fd)
    11         self.client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(self.path, self.host).encode("utf8"))
    12         selector.register(self.client.fileno(),EVENT_READ,self.readable)
    13 
    14     def readable(self,key):
    15         d = self.client.recv(1024)
    16         if d:
    17             self.data += d
    18         else:
    19             selector.unregister(key.fd)
    20             # 会将header信息作为返回字符串
    21             data = self.data.decode('utf8')
    22             print(data.split('
    
    ')[1])
    23             self.client.close()
    24             urls.remove(self.spider_url)
    25             if not urls:
    26                 global stop
    27                 stop=True
    28 
    29     def get_url(self,url):
    30         self.spider_url = url
    31         url = urlparse(url)
    32         self.host = url.netloc
    33         self.path = url.path
    34         self.data = b""
    35         if self.path == "":
    36             self.path = "/"
    37         # 建立socket连接
    38         self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    39         self.client.setblocking(False)
    40         try:
    41             self.client.connect((self.host, 80))
    42         except BlockingIOError as e:
    43             pass
    44 
    45         #注册写事件,及回调函数
    46         selector.register(self.client.fileno(),EVENT_WRITE,self.connected)
    47 
    48 def loop():
    49     #回调+事件循环+select(poll/epoll)
    50     #事件循环,不停的调用socket的状态并调用对应的回调函数
    51     #判断哪个可读可写,select本身不支持register模式
    52     #socket状态变化后的回调使用程序员完成的
    53     if not stop:
    54         while True:
    55             ready=selector.select()
    56             for key,mask in ready:
    57                 call_back=key.data
    58                 call_back(key)
    59 
    60 
    61 if __name__=='__main__':
    62     fetcher=Fetcher()
    63     fetcher.get_url('http://www.baidu.com')
    64     loop()
    View Code

    四. 回调之痛

      1.可读性差;2.共享状态管理困难;3.异常处理困难

      

    协程能解决            

    五. C10M问题和协程

      1.C10M问题:

        如何利用8核心CPU,64G内存,在10gps的网络上保持1000万的并发连接。

      2.协程:

        2.1问题:回调模式编码复杂度高;同步编程的并发性不高;多线程需要线程间同步,look会降低性能

        2.2解决:

          采用同步的方式去编写异步的代码;

          采用单线程去解决任务:线程是由操作系统切换,单线程切换意味着需要我们自己去调度任务;不在需要锁,并发性高,如果单线程内切换函数,性能远高于线程切换,并发性更高。

        2.3协程:

          传统函数调用 过程 A->B->C;

          我们需要一个可以暂停的函数,并且可以在适当的时候恢复该函数的继续执行;

         出现了协程 -> 有多个入口的函数, 可以暂停的函数, 可以暂停的函数(可以向暂停的地方传入值);
     1 def get_url(url):
     2     #do someting 1
     3     html = get_html(url) #此处暂停,切换到另一个函数去执行
     4     # #parse html
     5     urls = parse_url(html)
     6 
     7 def get_url(url):
     8     #do someting 1
     9     html = get_html(url) #此处暂停,切换到另一个函数去执行
    10     # #parse html
    11     urls = parse_url(html)
    View Code

    六. 生成器的send和yield from

      1.生成器send和next方法:

        启动生成器方式有两种:1.next();2.send();

        生成器可以产出值;也可以接收值(调用方传递进来的值);

        send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield的位置(注:在调用send()发送非none之前,我们必须启动一次生成器,否则会抛错,方式有两种gen.send(None)或者next(gen))

      2.close()方法:(关闭生成器)

        自己处理的话会抛异常,gen.close(),RuntimeError: generator ignored GeneratorExit,如果是except Exception就不会抛异常,GeneratorExit是继承至BaseException的,Exception也是继承于BaseException的

     1 def gen_func():
     2     #自己处理的话会抛异常,gen.close(),RuntimeError: generator ignored GeneratorExit
     3     try:
     4         yield 'https://www.baidu.com'
     5     #如果是except Exception就不会抛异常,GeneratorExit是继承至BaseException的,Exception也是继承于BaseException的
     6     except GeneratorExit as e:
     7         pass
     8     yield 1
     9     yield 2
    10     return 'LYQ'
    11 
    12 if __name__=='__main__':
    13     #抛异常StopIteration:
    14     gen=gen_func()
    15     print(next(gen))
    16     gen.close()
    17     print(next(gen))
    View Code

      3.throw()方法:向生成器中扔异常,需要自己处理,否则会抛错

     1 def gen_func():
     2     try:
     3         yield 'https://www.baidu.com'
     4     except Exception:
     5         pass
     6     yield 1
     7     yield 2
     8     return 'LYQ'
     9 
    10 if __name__=='__main__':
    11     #抛异常StopIteration:
    12     gen=gen_func()
    13     print(next(gen))
    14     #扔一个异常,是第一句的异常
    15     gen.throw(Exception,'download error')
    16     print(next(gen))
    17     #扔一个异常,是第二句的异常
    18     gen.throw(Exception,'download error')
    19     print(next(gen))
    View Code

       4.yield from:(Python 3.3新加的语法)

        4.1简介:

     1 from itertools import chain
     2 my_list=[1,2,3]
     3 my_dict={'name1':'LYQ1',
     4          'name2':'LYQ2'}
     5 #将所有值遍历输出
     6 # for value in chain(my_list,my_dict,range(5,10)):
     7 #     print(value)
     8 
     9 def g1(iterable):
    10     yield range(10)
    11 #yield from iterable
    12 def my_chain(*args,**kwargs):
    13     for my_iterable in args:
    14         #功能非常多
    15         yield from my_iterable
    16         # for value in my_iterable:
    17         #     yield value
    18 for value in my_chain(my_list,my_dict,range(5,10)):
    19     print(value)
    View Code

         4.2main调用方 g1:委托生成器 gen:子生成器:

     1 def g1(gen):
     2     yield from gen
     3 gen=range(10)
     4 def main():
     5     g=g1(gen)
     6     #直接发送给子生成器
     7     print(g.send(None))
     8 #main:调用方 g1:委托生成器 gen:子生成器
     9 #yield from会在调用方与子生成器之间建立一个双向通道
    10 main()
    View Code

         4.3例子:

     1 final_result = {}
     2 
     3 
     4 def sales_sum(pro_name):
     5     total = 0
     6     nums = []
     7     while True:
     8         x = yield
     9         print(pro_name + "销量: ", x)
    10         if not x:
    11             break
    12         total += x
    13         nums.append(x)
    14     #直接返回到yield from sales_sum(key)
    15     return total, nums
    16 
    17 
    18 def middle(key):
    19     while True:
    20         final_result[key] = yield from sales_sum(key)
    21         print(key + "销量统计完成!!.")
    22 
    23 
    24 def main():
    25     data_sets = {
    26         "面膜": [1200, 1500, 3000],
    27         "手机": [28, 55, 98, 108],
    28         "大衣": [280, 560, 778, 70],
    29     }
    30     for key, data_set in data_sets.items():
    31         print("start key:", key)
    32         m = middle(key)
    33         #直接send到子生成器里面(x = yield)
    34         m.send(None)  # 预激middle协程
    35         for value in data_set:
    36             m.send(value)  # 给协程传递每一组的值
    37         m.send(None)
    38     print("final_result:", final_result)
    39 
    40 
    41 if __name__ == '__main__':
    42     main()
    View Code

          无yield from:

     1 def sales_sum(pro_name):
     2     total = 0
     3     nums = []
     4     while True:
     5         x = yield
     6         print(pro_name + "销量: ", x)
     7         if not x:
     8             break
     9         total += x
    10         nums.append(x)
    11     #直接返回到yield from sales_sum(key)
    12     return total, nums
    13 
    14 if __name__ == "__main__":
    15     #直接与子生成器通信(没用yield from就需要捕获异常)
    16     my_gen = sales_sum("手机")
    17     my_gen.send(None)
    18     my_gen.send(1200)
    19     my_gen.send(1500)
    20     my_gen.send(3000)
    21     try:
    22         my_gen.send(None)
    23     #获取返回值
    24     except StopIteration as e:
    25         result = e.value
    26         print(result)
    View Code

      4.4介绍yield from详情:

     1 #pep380
     2 
     3 #1. RESULT = yield from EXPR可以简化成下面这样
     4 #一些说明
     5 """
     6 _i:子生成器,同时也是一个迭代器
     7 _y:子生成器生产的值
     8 _r:yield from 表达式最终的值
     9 _s:调用方通过send()发送的值
    10 _e:异常对象
    11 
    12 """
    13 
    14 _i = iter(EXPR)      # EXPR是一个可迭代对象,_i其实是子生成器;
    15 try:
    16     _y = next(_i)   # 预激子生成器,把产出的第一个值存在_y中;
    17 except StopIteration as _e:
    18     _r = _e.value   # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
    19 else:
    20     while 1:    # 尝试执行这个循环,委托生成器会阻塞;
    21         _s = yield _y   # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
    22         try:
    23             _y = _i.send(_s)    # 转发_s,并且尝试向下执行;
    24         except StopIteration as _e:
    25             _r = _e.value       # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
    26             break
    27 RESULT = _r     # _r就是整个yield from表达式返回的值。
    28 
    29 """
    30 1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法;
    31 2. 如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常;
    32 3. 调用方让子生成器自己抛出异常
    33 4. 当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法;
    34 """
    35 _i = iter(EXPR)
    36 try:
    37     _y = next(_i)
    38 except StopIteration as _e:
    39     _r = _e.value
    40 else:
    41     while 1:
    42         try:
    43             _s = yield _y
    44         except GeneratorExit as _e:
    45             try:
    46                 _m = _i.close
    47             except AttributeError:
    48                 pass
    49             else:
    50                 _m()
    51             raise _e
    52         except BaseException as _e:
    53             _x = sys.exc_info()
    54             try:
    55                 _m = _i.throw
    56             except AttributeError:
    57                 raise _e
    58             else:
    59                 try:
    60                     _y = _m(*_x)
    61                 except StopIteration as _e:
    62                     _r = _e.value
    63                     break
    64         else:
    65             try:
    66                 if _s is None:
    67                     _y = next(_i)
    68                 else:
    69                     _y = _i.send(_s)
    70             except StopIteration as _e:
    71                 _r = _e.value
    72                 break
    73 RESULT = _r
    View Code

        看完代码,我们总结一下关键点:

        1. 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;
        2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
        3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
        4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡";
        5. 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡";
        6. 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出GeneratorExit异常。

    七. 生成器如何变成协程?

      1.生成器可以暂停并获取状态:

     1 #生成器是可以暂停的函数
     2 import inspect
     3 def gen():
     4     yield 1
     5     return True
     6 
     7 if __name__=='__main__':
     8     g1=gen()
     9     #获取生成器状态 GEN_CREATED(创建)
    10     print(inspect.getgeneratorstate(g1))
    11     next(g1)
    12     #GEN_SUSPENDED暂停
    13     print(inspect.getgeneratorstate(g1))
    14     try:
    15         next(g1)
    16     except StopIteration:
    17         pass
    18     #GEN_CLOSED关闭
    19     print(inspect.getgeneratorstate(g1))
    View Code

      2.协程的调度依然是 事件循环+协程模式 ,协程是单线程模式:

     1 #生成器是可以暂停的函数
     2 import inspect
     3 # def gen_func():
     4 #     value=yield from
     5 #     #第一返回值给调用方, 第二调用方通过send方式返回值给gen
     6 #     return "bobby"
     7 #1. 用同步的方式编写异步的代码, 在适当的时候暂停函数并在适当的时候启动函数
     8 import socket
     9 def get_socket_data():
    10     yield 1
    11 
    12 def downloader(url):
    13     client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    14     client.setblocking(False)
    15 
    16     try:
    17         client.connect((host, 80))  # 阻塞不会消耗cpu
    18     except BlockingIOError as e:
    19         pass
    20 
    21     selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    22     #如果get_socket_data()中出现异常,会直接抛给downloader(向上抛)
    23     source = yield from get_socket_data()
    24     data = source.decode("utf8")
    25     html_data = data.split("
    
    ")[1]
    26     print(html_data)
    27 
    28 def download_html(html):
    29     html = yield from downloader()
    30 
    31 if __name__ == "__main__":
    32     #协程的调度依然是 事件循环+协程模式 ,协程是单线程模式
    33     pass
    View Code

    八. async和await原生协程

      1.python为了将语义变得更加明确,就引入了async和await关键字定义原生的协程:

        生成器实现的协程又可以当生成器,又可以当协程,且代码凌乱,不利于后期维护。原生的协程中不可以yield,否则会抛错(让协程更加明确)

    可异步调用:实际实现了__await__魔法函数

        await:将控制权交出去并等待结果返回,await只能接收awaitable对象,可以理解成yield from

     1 # from collections import Awaitable
     2 #如果是函数,就要使用coroutine装饰器,实际将__await_指向___iter__
     3 # import types
     4 # @types.coroutine
     5 # def downloader(url):
     6 #     return "haha"
     7 
     8 async def downloader(url):
     9     return "haha"
    10 async def download_url(url):
    11     #将控制权交出去并等待结果返回,await只能接收awaitable对象,可以理解成yield from
    12     html=await downloader(url)
    13     return html
    14 
    15 if __name__=='__main__':
    16     coro=download_url('www.baidu.com')
    17     #原生协程不能调用next
    18     coro.send(None)
    View Code

     

  • 相关阅读:
    用spring tool suite插件创建spring boot项目时报An internal error occurred during: "Building UI model". com/google/common/
    Eclipse卸载插件SpringSoource-tool-suite
    spring-tool-suite使用教程,并创建spring配置文件
    MySQL获取指定长度的字符串的函数left(s,n)和right(s,n)
    maven web项目配置log4j,及log4j参数设置
    Caused by: org.apache.ibatis.reflection.ReflectionException: There is no getter for property named 'zoneId' in 'class java.lang.String'
    ifconfig 输出里没有IP地址
    maven web项目生成WebContent或WebRoot目录
    Window 命令行神器:cmder
    Filezilla Download for Linux (deb, rpm, txz, xz, amd64, i386, i486, i586, i686, x86_64)
  • 原文地址:https://www.cnblogs.com/lyq-biu/p/10481438.html
Copyright © 2011-2022 走看看