zoukankan      html  css  js  c++  java
  • Python说文解字_Python之多任务_04

    问:并发、并行、同步、异步、阻塞、非阻塞

    答:

      并发、并行:

      并发是指一个时间段内(不是指的时间点),有几个程序在同一个CPU上运行,但是任意时刻只有一个程序在CPU上运行。对人类的时钟来说1秒钟能干很多事,但是计算机1秒钟运算上亿次。让我感觉是很多程序一起运行,其实是一个程序在运行。

      并行是指任意时刻点(这里这里是时刻点 ),有很多个程序同时(这里注意是同时 )在多个CPU上运行。

      如果一个CPU是四核,我们最高的并行是4核。

      同步、异步:(涉及到IO操作的时候要考虑的,这是属于消息操作的一种方式

      同步是指代码调用IO操作时,必须等待IO操作完成才能返回的调用方式。

      异步是指代码调用IO操作时,不必等待IO操作完成就返回的调用方法。(多线程就是典型的异步操作)

      阻塞、非阻塞:(涉及到IO操作的时候要考虑的,就是注意挂起的问题。这是属于函数调用的一种方式

      阻塞指的是调用函数时候当前线程被挂起

      非阻塞指调用函数的时候当前线程不会被挂起,而是立刻返回。

    问:一个问题的提出:C10K问题是什么?

    答:C10K问题是在1999年被提出来的技术挑战:如何一颗1GHz CPU,2G内存,1gbps网络环境下,让单台服务器同时为1万个客户端提供FTP服务。

      在早期的互联网用户非常少,不会考虑到并发的问题。一个线程只能处理一个Socket,如果用这种模式不能让一个服务器开启上万个客户的。

    问:UNIX下的五种IO模型:

    答:

      阻塞式IO:

      非阻塞式IO:

      IO多路复用

      信号驱动式IO:现在使用非常少:

      异步IO(POSIX的aio_系列函数):

      这五种IO模式是一个递进发展的关系。

    问:阻塞式IO、非阻塞式IO:

    答:比如之前的的socket编程就会遇到很多阻塞式IO

      1.client.connect(host,80)、client.recv(1024):

      就会遇到三次握手(关于三次握手四次挥手请参照:https://blog.csdn.net/li0978/article/details/52598121),这个过程实际上是阻塞的,如果当前这个网络连接不返回的话,会一直等待网络数据的返回。IO的操作时间和CPU的时间差距非常大,对CPU的利用率非常低的,网络中CPU的资源是非常重要的资源。时间浪费非常严重。

      比如我们可以设置client.setblocking(False)的话,connect会立刻返回。但是非阻塞式IO会带来一些问题。如果网路连接没有建立好,send会出问题的,这样就不停的询问连接是否建立好。connect阻塞不会消耗CPU的,但是我们要进行后续的操作,我们必须要确定connect是否连接好了,所以这里需要While True:循环一直询问。但是While循环会消耗CPU的。其实还不如block让它阻塞掉,但是如下下面的代码不依赖于连接,这种非阻塞式IO就非常有用的。可以转而去做其他的事情。

      内核:就是操作系统为了保护内存,保留一部分内存给操作系统用,比如我们在调用recvfrom函数是深入到操作系统的函数,再去请求我们的网络,然后在拷贝到应用程序的缓存地址里面。

    问:继续上面的问题:我们将数据从内核复制到用户空间,告诉我们的程序准备好了呢?

    答:这就是IO复用:

      select poll epoll是我们最常用的三种命令方式。

      select的方法其实也是一种阻塞的方法。但是和我们当时的While有很多的区别,他可以监听多个socket的状态。前面只能监听一个。监听多个给我们一个非常大的好处。是现在高并发技术应用的最多的点。但是从将数据从内核复制到用户的空间还是需要时间的。但是把很多步骤省略了。

    问:信号驱动式IO

    答:建立一个信号处理程序,是一种基于信号来的。但是现在应用非常少。

    问:异步IO

    答:aio开头的。这是真正的异步IO。他会将数据从内核复制到用户空间之后,再回发送信号处理程序处理数据报。是操作系统给我们准备好了之后再发送。

    问:IO复用中的select  poll  epoll:

    答:IO复用和异步IO是现在比较常用的技术。这三个都是IO多路复用的机制。IO多路复用就是通过一种机制、一个进程可以监视多个描述符,一旦某个描述符就绪(一般是就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步IO,因为它们都需要在读写时间就绪后自己负责进行读写,也就是说读写过程是阻塞的,而一部IO则无需自己负责进行读写,异步IO会负责把数据从内核拷贝到用户空间。

      因此:IO多路复用(同步IO) vs  异步IO,它们之间是这么一种关系。

    问:select:

    答:selcet函数监视的文件描述分三类:

      writefds

      readfds  

      exceptfds

      调用后,select函数会阻塞,知道有描述符就绪(有数据可读、可写或者有except)或者超时(timeout指定等待时间,如果立刻返回设为null即可),函数返回。当selcet函数返回后,可以遍历fdset,来找到就绪的描述符。

      selcet目前几乎在所有平台上支持,其良好的跨平台支持也是他的一个优点。selcet的一个缺点在于单个继承能够监视文件描述符的数量存在最大显示,在Linux上一般为1024,但是可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样荷叶灰造成效率的极低。

    问:poll:

    答:不同于selct使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。

      pollfd解耦股包含了要监视的event和发生的event,不再使用 select"参数-值"传递方式。同时pollfd并没有最大数量限制(但是数量过大后性能也是会下降的)。和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

      从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符的增长,其效率也会线性下降。

    问:epoll:(在Linux下面支持,在Windows下面不支持的):

    答:epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的实际存在到内核的一个事件表中,这样用户和内核控件的copy只需一次。epoll其实时使用了红黑树算法实现的。具体的红黑树算法见:https://www.cnblogs.com/skywang12345/p/3245399.html。epoll并不代表比select好。在并发高的情况下,连接活跃度不高,epoll比select好。并发不高的话,同时连接很活跃,select比epoll好。

    问:相关的举例:

    答:

      1.通过非阻塞IO实现http请求。

    import socket
    from urllib.parse import urlparse
    
    # 通过非阻塞IO完成http请求
    def get_url(url):
        # 通过socket请求html
        url = urlparse(url)
        host = url.netloc
        path = url.path
        if path == "":
            path = "/"
    
        # 建立socket连接
        client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        client.setblocking(False)
        try:
            client.connect((host,80))  # 阻塞不会消耗CPU
        except BlockingIOError as e:
            pass
    
        # 不停的询问连接是否建立好,需要while循环不停的去检查状态
        # 做计算任务或再次发起连接请求。
    
        while True:
            try:
                client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(path, host).encode('utf8'))
                break
            except OSError as e:
                pass
    
    
        data = b""
        while True:
            try:
                d = client.recv(1024)
            except BlockingIOError as e:
                continue
            if d:
                data += d
            else:
                break
    
        data = data.decode("utf-8")
        html_data = data.split("
    
    ")[1]
        print(html_data)
        client.close()
    
    if __name__ == '__main__':
        get_url("http://www.baidu.com")

      

      2. 通过IO复用的select的方法:

      我们这里使用selectors这个包的DefaultSelector的包,这个包比select包装更好的,而且选择poll方法和epoll方法会根据平台自动选择。还给我们提供了注册的机制。

    import socket
    from urllib.parse import urlparse
    import select
    from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
    
    selector = DefaultSelector()
    
    class Fetcher:
        def connected(self, key):
            selector.unregister(key.fd)
            self.client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(self.path, self.host).encode('utf8'))
            selector.register(self.client.fileno(),EVENT_READ,self.readable)
    
        def readable(self,key):
            d = self.client.recv(1024)
            if d:
                self.data += d
            else:
                selector.unregister(key.fd)
                data = self.data.decode("utf-8")
                html_data = data.split("
    
    ")[1]
                print(html_data)
                self.client.close()
    
        def get_url(self,url):
            url = urlparse(url)
            self.host = url.netloc
            self.path = url.path
            self.data = b""
            if self.path == "":
                self.path = "/"
    
            # 建立socket连接
            self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client.setblocking(False)
    
            try:
                self.client.connect((self.host,80))  # 阻塞不会消耗CPU
            except BlockingIOError as e:
                pass
    
            #!!!!注册!!!!
            selector.register(self.client.fileno(),EVENT_WRITE,self.connected)
    
    def loop():
        # 时间循环:不停的情请求socket的状态并调用对应的回调函数。
        # 1. select 本身是不支持register模式的,
        # 2. socket状态编号以后的回调是由程序员完成的。
        while True:
            ready = selector.select()
            for key,mask in ready:
                call_back = key.data
                call_back(key)
        # 回调+时间循环+select(poll/epoll)
    
    if __name__ == '__main__':
        fetcher = Fetcher()
        fetcher.get_url("http://www.baidu.com")
        loop()

      运行这段代码的时候会提示:OSError: [WinError 10022] 提供了一个无效的参数。在Linux下面不会报错。

      因此我们在Windows底下添加两个全局变量进行更改,就不会抛异常了:

    import socket
    from urllib.parse import urlparse
    import select
    from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
    
    selector = DefaultSelector()
    urls = ["http://www.baidu.com"]
    stop = False
    
    
    class Fetcher:
        def connected(self, key):
            selector.unregister(key.fd)
            self.client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(self.path, self.host).encode('utf8'))
            selector.register(self.client.fileno(),EVENT_READ,self.readable)
    
        def readable(self,key):
            d = self.client.recv(1024)
            if d:
                self.data += d
            else:
                selector.unregister(key.fd)
                data = self.data.decode("utf-8")
                html_data = data.split("
    
    ")[1]
                print(html_data)
                self.client.close()
                urls.remove(self.spider_url)
                if not urls:
                    global stop
                    stop = True
    
        def get_url(self,url):
            self.spider_url = url
            url = urlparse(url)
            self.host = url.netloc
            self.path = url.path
            self.data = b""
            if self.path == "":
                self.path = "/"
    
            # 建立socket连接
            self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client.setblocking(False)
    
            try:
                self.client.connect((self.host,80))  # 阻塞不会消耗CPU
            except BlockingIOError as e:
                pass
    
            #!!!!注册!!!!
            selector.register(self.client.fileno(),EVENT_WRITE,self.connected)
    
    def loop():
        # 时间循环:不停的情请求socket的状态并调用对应的回调函数。
        # 1. select 本身是不支持register模式的,
        # 2. socket状态编号以后的回调是由程序员完成的。
        while not stop:
            ready = selector.select()
            for key,mask in ready:
                call_back = key.data
                call_back(key)
        # 回调+时间循环+select(poll/epoll)
    
    if __name__ == '__main__':
        fetcher = Fetcher()
        fetcher.get_url("http://www.baidu.com")
        loop()

      这种方式的好处就是并发性高。

    问:回调之痛?

    答:回调会产生很多问题:

      如果回调函数执行不正常该如何?

      如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?

      如果嵌套了多层,其中某个环节出错了会造成什么后果?

      如果有一个数据需要被每个回调都处理怎么办?

      怎么使用当前函数中的局部变量?

      总结回调的问题有三个方面:

      1.可读性差

      2.共享状态管理困难

      3.异常处理困难

    问:C10M问题?

    答:C10M问题是随着互联网的飞速发展,如果利用八核CPU,64G内存,在10gbps的网络上保持10000并发并连接?C10K也满足不了我们了。因此这里就用到了协程了。

    问:我们有什么样的处理思路?

    答:回调模式有很多的缺点,协程就是为了编写难的问题。我们列举一下当前存在的问题:

      1.回调模式编码复杂度高

      2.同步编程的并发性不高

      3.多线程编程需要线程间同步,用的锁的机制。

      怎么解决?

      1.采用同步的方式去编写异步的代码

      2.使用单线程去切换任务:

        1.线程是由操作系统切换的,单线程切换意味着我们需要程序员自己去调度任务

        2.不再需要锁,并发性高。如果我们能在单线程直接切换就好像函数之间的调用一样,如果单线程内切换函数,性能远高于线程切换,而且它的并发性更高。如果我们声明1000个函数比声明1000个线程并发性越高。

      要实现这些对现有的编程模式有很大的挑战。

    问:一个关于函数的问题,关于函数是否可以暂停:

    答:比如我们有这么一段代码:

    def get_url(url):
        # do something:
        html = get_html(url)  # 此处暂停,切换到另一个函数去执行
        # parse html
        urls = parse_url(html)
    
    def get_url2():
        # do something:
        html = get_html(url)  # 此处暂停,切换到另一个函数去执行
        # parse html
        urls = parse_url(html)

      我们想一下:

      传统函数调用过程就是A执行完执行B执行完执行C。

      我们需要一个可以暂停的函数,并且可以在适当的时候回复该函数继续去执行。

      如果能够这两个,是不是可以玩儿了。因此这个地方就出现了协程

      协程有两个定义:有多个入口的函数或者说可以暂停的函数(可以向暂停的地方传入值),我们感觉到生成器就是一个可以暂停的概念。

    问:还记得我们的生成器怎么用嘛?

    答:我们用生成器对象,然后next去调用。生成器是使用了我们迭代协议的。

    def gen_func():
        yield 1
        yield 2
        yield 3
        return "bobby"
    
    if __name__ == '__main__':
        gen = gen_func() # 我们建立一个生成器对象
        print(next(gen))
        print(next(gen))
        print(next(gen))
        print(next(gen))
    StopIteration: bobby

      1. 生成器不只可以产生值,还可以接收值  

    def gen_func():
    # 1.可以产出值,2.可以接收值(调用方船体进来的值)
    html = yield "http://www.baidu.com"
    print(html)
    yield 2
    yield 3
    return "bobby"

    if __name__ == '__main__':
    gen = gen_func() # 我们建立一个生成器对象

    # 1.启动生成器方法又两种,next和send
    url = next(gen)
    html = "bobby111"
    print(gen.send(html))
    # 2. send方法可以传递值,进入生成器内部,同时还可以重启生成器执行到下一个yield位置。
    gen.send(html)

    # bobby111
    # 2

      将我们的值传递给生成器内部,这里要注意的地方:第一次调用send的时候不能send一个非None的值,跟前面的一样,首先要给生成器进行初始化对象。在调用send发送非None值之前,我们必须启动一次生成器,方式有两种:

      gen.send(None)

      next(gen)

      其中,html = yield 内容,跟我们说的一样,yield看做是return,把yield后面的值传递给html

    def gen_func():
        yield "http://www.baidu.com"
        yield 2
        yield 3
        return "bobby"
    
    if __name__ == '__main__':
        gen = gen_func()
        print(next(gen))
        gen.close()
        next(gen)
    
        # 抛出异常:StopIteration:generator ignored GeneratotExit

      我们看到他会抛异常。GeneratorExit是继承自BaseException,Exception是比他们更基础的,如果Try Except的话 。gen.close()不会抛异常。close()向上抛关闭。

    def gen_func():
        try:
            yield "http://www.baidu.com"
        except Exception as e:
            print(e)
        yield 2
        yield 3
        return "bobby"
    
    if __name__ == '__main__':
        gen = gen_func()
        print(next(gen))
        gen.throw(Exception,"download error")
        next(gen)
        # gen.throw(Exception,"download error")
    
        # http: // www.baidu.com
        # download
        # error

       另外,gen.throw的方式会抛第一个异常,需要处理,但是close不需要处理。

      我们上面介绍了生成器的新的内容send,close,throw。

      我们在介绍一个在Py3.3添加了yield from语法。

      首先介绍一下chain,这个是把一些可迭代对象,连接起来。用for循环进行遍历。

    from itertools import chain
    
    my_list = [1,2,3]
    my_dict = {
        "bobby1":"http://www.baidu.com",
        "bobby2":"http://www,sina.com"
    }
    
    for value in chain(my_list,my_dict,range(5,10)):
        print(value)
    
        # 1
        # 2
        # 3
        # bobby1
        # bobby2
        # 5
        # 6
        # 7
        # 8
        # 9

       我们再来改写一下一个函数来实现这个chain

    from itertools import chain
    
    my_list = [1,2,3]
    my_dict = {
        "bobby1":"http://www.baidu.com",
        "bobby2":"http://www,sina.com"
    }
    
    def my_chain(*args,**kwargs):
        for my_iterable in args:
            for value in my_iterable:
                yield value
    
    
    for value in my_chain(my_list,my_dict,range(5,10)):
        print(value)
    
        # 1
        # 2
        # 3
        # bobby1
        # bobby2
        # 5
        # 6
        # 7
        # 8
        # 9

       这里就有yield from将代码进一步缩减:

    yield from EXPR(可以简化)
    1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw 和 .close方法。
    2.如果自生成其支持.throw和close方法,但是子生成器内部,这两个方法都会抛出异常。
    3.调用方让子生成器自己抛出异常。
    4.当调用方使用next,send(None)函数,当调用方使用.send()发送非None值是,才调用子生成器.send方法。
    from itertools import chain
    
    my_list = [1,2,3]
    my_dict = {
        "bobby1":"http://www.baidu.com",
        "bobby2":"http://www,sina.com"
    }
    
    def my_chain(*args,**kwargs):
        for my_iterable in args:
            yield from my_iterable
            # for value in my_iterable:
            #     yield value
    
    
    for value in my_chain(my_list,my_dict,range(5,10)):
        print(value)
    
        # 1
        # 2
        # 3
        # bobby1
        # bobby2
        # 5
        # 6
        # 7
        # 8
        # 9

      再举一个例子:

    def g1(iterable):
        yield iterable
    def g2(iterable):
        yield from iterable
    
    for value in g1(range(10)):
        print(value)
    for value in g2(range(10)):
        print(value)
    
    
    # 0
    # 1
    # 2
    # 3
    # 4
    # 5
    # 6
    # 7
    # 8
    # 9

      

      再举一个最重要的例子:

    def g1(gen):
        yield from gen
    
    def main():
        g = g1()
        g.send(None)

       在这个例子中才是yield from最重要的应用,也是最核心的点,含义有两个方面:

      1. main为调用方,

      2. g1(委托生成器)

      3. gen(子生成器)

      4. yield from 会在调用方与子生成器之间一个双向通道。

      根据这个我们举一个yield from 比较详细的一个例子:

    final_result = {}
    
    def sales_sum(pro_name):
        total = 0
        nums = []
        while True:
            x = yield
            print(pro_name+"销量:",x)
            if not x:
                break
            total += x
            nums.append(x)
        return total,nums
    
    def middle(key):
        while True:
            final_result[key] = yield from sales_sum(key)
            print(key+"销量统计完成!!")
    
    def main():
        data_sets = {
            "bobby牌面膜":[1200,1500,3000],
            "bobby牌手机":[20,55,98,100],
            "bobby牌大一":[280,560,778,70]
        }
        for key,data_set in data_sets.items():
            m = middle(key)
            m.send(None) # 预激middle协程
            for value in data_set:
                m.send(value) # 给谢忱个传递每一组值
            m.send(None)
        print("final_result:",final_result)
    
    if __name__ == '__main__':
        main()
    
    # bobby牌面膜销量: 1200
    # bobby牌面膜销量: 1500
    # bobby牌面膜销量: 3000
    # bobby牌面膜销量: None
    # bobby牌面膜销量统计完成!!
    # bobby牌手机销量: 20
    # bobby牌手机销量: 55
    # bobby牌手机销量: 98
    # bobby牌手机销量: 100
    # bobby牌手机销量: None
    # bobby牌手机销量统计完成!!
    # bobby牌大一销量: 280
    # bobby牌大一销量: 560
    # bobby牌大一销量: 778
    # bobby牌大一销量: 70
    # bobby牌大一销量: None
    # bobby牌大一销量统计完成!!
    # final_result: {'bobby牌面膜': (5700, [1200, 1500, 3000]), 'bobby牌手机': (273, [20, 55, 98, 100]), 'bobby牌大一': (1688, [280, 560, 778, 70])}

      运用这个模式,就不用try exception处理异常了。就变得非常的简单,不用做大量的try...exception了。yield from帮我们完成了很多的工作。所以记住上面的那种格式。

      如下我们对yield from做一个总结:

      1. 子生成器产生的值,都是直接传给调用方的:调用方通过.send()发送的值都是直接传递给子生成器的:如果发送的是None,会调用子生成器__next__()方法,如果不是None,会调用子生成器的.send()方法。

      2.子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;

      3.yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数。

      4.如果调用的时候出现了StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上“冒泡”;

      5.传入委托生成器的异常里,除了GenerationExit之外,其他的所有异常全部传递给子生成器的throw()方法,如果调用.throw()的是出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上“冒泡”;

      6.如果在委托生成器上调用.close()或传入GenerationExit异常,会调用自生成的.close()方法,没有的话就不会调用。如果在调用.close()的时候抛出了异常,那么就向上“冒泡”,否则委托生成器会抛出GenerationExit异常。

      

  • 相关阅读:
    容器虚拟化之LXC(LinuX Container)
    Twemproxy+Keepalived+Redis
    Haproxy+Keepalived+Nginx
    Redis-sentinel集群
    Server2008r2 指定默认用户自动登录--注册表
    .Net Core应用框架Util介绍(二) 转
    .Net Core应用框架Util介绍(一)转
    一分钟教你知道乐观锁和悲观锁的区别
    设置DataTable行属性
    js中的内置对象
  • 原文地址:https://www.cnblogs.com/noah0532/p/11017357.html
Copyright © 2011-2022 走看看