zoukankan      html  css  js  c++  java
  • gj12-2 协程和异步io

    12.3 epoll+回调+事件循环方式url

    import socket
    from urllib.parse import urlparse
    
    # 使用非阻塞io完成http请求
    
    def get_url(url):
        # 通过socket请求html
        url = urlparse(url)
        host = url.netloc
        path = url.path
        if path == "":
            path = "/"
    
        # 建立socket连接
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.setblocking(False)
        try:
            client.connect((host, 80))  # 阻塞不会消耗cpu
        except BlockingIOError as e:
            print(e)
    
        while True:  # 不停的询问连接是否建立好, 需要while循环不停的去检查状态
            try:     # 尝试不停发
                client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(path, host).encode("utf8"))
                break
            except OSError as e:
                pass
    
        data = b""
        while True:
            try:
                d = client.recv(1024)
            except BlockingIOError as e:
                continue
            if d:
                data += d
            else:
                break
    
        data = data.decode("utf8")
        html_data = data.split("
    
    ")[1]
        print(html_data)
        client.close()
    
    
    if __name__ == "__main__":
        get_url(http://www.baidu.com)
    
    通过非阻塞io实现http请求

    select + 回调 + 事件循环

    并发性高, 使用单线程

    import socket
    from urllib.parse import urlparse
    from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
    
    # 自动选择IO复用的方法
    selector = DefaultSelector()
    # 使用select完成http请求
    urls = []  # 存放爬取的url
    stop = False
    
    
    class Fetcher:
    
        def get_url(self, url):
            self.spider_url = url
            url = urlparse(url)
            self.host = url.netloc
            self.path = url.path
            self.data = b""
            if self.path == "":
                self.path = "/"
    
            # 建立socket连接
            self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client.setblocking(False)
    
            try:
                self.client.connect((self.host, 80))  # 阻塞不会消耗cpu
            except BlockingIOError as e:
                pass
    
            # 注册
            selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    
        # 建立成功后发送请求
        def connected(self, key):
            selector.unregister(key.fd)  #
            self.client.send(
                "GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(self.path, self.host).encode("utf8"))
            selector.register(self.client.fileno(), EVENT_READ, self.readable)
    
        # 读取
        def readable(self, key):
            d = self.client.recv(1024)  # 准备好了就会循环调用
            if d:
                self.data += d
            else:
                selector.unregister(key.fd)
                data = self.data.decode("utf8")
                html_data = data.split("
    
    ")[1]
                print(html_data)
                self.client.close()
    
                urls.remove(self.spider_url)  # 从 urls 列表里面去掉完成的url
                if not urls:
                    global stop
                    stop = True
    
    
    def loop():
        # 事件循环,不停的请求socket的状态并调用对应的回调函数
        # 1. select本身是不支持register模式
        # 2. socket状态变化以后的回调是由程序员完成的
        while not stop:
            ready = selector.select()
            for key, mask in ready:
                call_back = key.data
                call_back(key)
        # 回调+事件循环+select(pollepoll)
    
    
    if __name__ == "__main__":
        fetcher = Fetcher()
        import time
    
        start_time = time.time()
        for url in range(20):
            url = "http://www.baidu.com/".format(url)
            urls.append(url)
            fetcher = Fetcher()
            fetcher.get_url(url)
        loop()
        print(time.time() - start_time)

    12.4 回调之痛

    将代码逻辑拆分成了几段,维护性不高

    如果回调函数执行不正常该如何?
    如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?
    如果嵌套了多层,其中某个环节出错了会造成什么后果?
    如果有个数据需要被每个回调都处理怎么办?
    怎么使用当前函数中的局部变量?

    1.可读性差
    2.共享状态管理困难
    3.异常处理困难

    12.5 协程是什么

    C10M问题和协程

    如何利用8核心CPU,64G内存,在10gbps的网络上保持1000万并发连接

    1.回调模式编码复杂度高
    2.同步编程的并发性不高
    3.多线程编程需要线程间同步,lock


    1.采用同步的方式去编写异步的代码
    2.使用单线程去切换任务:
       1.线程是由操作系统切换的,单线程切换意味着我们需要程序员自己去调度任务
       2.不在需要锁,并发性高,如果单线程内切换函数,性能远高于线程切换,并发性更高

    def get_url(url):
        #do someting 1
        html = get_html(url) #此处暂停,切换到另一个函数去执行
        # #parse html
        urls = parse_url(html)
    
    def get_url(url):
        #do someting 1
        html = get_html(url) #此处暂停,切换到另一个函数去执行
        # #parse html
        urls = parse_url(html)
    
    # 传统函数调用 过程 A->B->C
    # 我们需要一个可以暂停的函数,并且可以在适当的时候恢复该函数的继续执行
    # 出现了协程 -> 有多个入口的函数, 可以暂停的函数, 可以暂停的函数(可以向暂停的地方传入值)
    协程 -> 可以暂停的函数

    12.6 生成器进阶-send、close和throw方法

    def gen_func():
        # 1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
        html = yield "http://lewen.com"
        print("inner:",html)
        yield 2
        yield 3
        return "lewen"
    
    
    
    if __name__ == "__main__":
    
        gen = gen_func()
    
        # 1.启动生成器方式有两种, next(), send
        # 在调用send发送非none值之前,我们必须启动一次生成器,
        # 方式有两种1. gen.send(None), 2. next(gen)
        url = gen.send(None)
        # print(url)  # http://lewen.com
        # url = next(gen)
    
        # download url
        html = "lewen"
    
        # gen.send(html)  # inner: lewen
    
        print(gen.send(html))  # send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置
        """
        inner: lewen
        2
        """
    
    
    
    
        # print(next(gen))
        # print(next(gen))
        # print(next(gen))
        # print(next(gen))
    gen_send
    def gen_func():
        # 1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
        try:
            yield "http://lewen.com"
        except Exception:
            pass
    
        # yield "http://projectsedu.com"
        yield 2
        yield 3
        return "lewen"
    
    
    if __name__ == "__main__":
        gen = gen_func()
        print(next(gen))
        gen.close()           # 关闭了生成器
        print(next(gen))  # StopIteration
    
    # ----
    http://lewen.com
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-8-7930c3f86cda> in <module>
         16     print(next(gen))
         17     gen.close()           # 关闭了生成器
    ---> 18     print(next(gen))  # StopIteration
    
    StopIteration:
    ----
    
    
    def gen_func():
        # 1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
        try:
            yield "http://projectsedu.com"
        except GeneratorExit:
            pass
    
        # yield "http://projectsedu.com"
        yield 2
        yield 3
        return "lewen"
    
    
    if __name__ == "__main__":
        gen = gen_func()
        print(next(gen))
        gen.close()           # 关闭了生成器
        print(next(gen))  # StopIteration
    
    
        # GeneratorExit是继承自BaseException, 并没有继承 Exception
    
    # ---
    http://projectsedu.com
    ---------------------------------------------------------------------------
    RuntimeError                              Traceback (most recent call last)
    <ipython-input-9-a1ac8a75795f> in <module>
         15     gen = gen_func()
         16     print(next(gen))
    ---> 17     gen.close()           # 关闭了生成器
         18     print(next(gen))  # StopIteration
         19
    
    RuntimeError: generator ignored GeneratorExit
    ---
    
    
    
    def gen_func():
        # 1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
    #     try:
    #         yield "http://projectsedu.com"
    #     except Exception:
    #         pass
    
        yield "http://projectsedu.com"
        yield 2
        yield 3
        return "lewen"
    
    
    if __name__ == "__main__":
        gen = gen_func()
        print(next(gen))
        gen.close()           # 关闭了生成器
        print("lewen")
    
    # ---
    http://projectsedu.com
    lewen
    
    
    gen_close
    def gen_func():
        #1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
        try:
            yield "http://projectsedu.com"
        except Exception as e:
            pass
        yield 2
        yield 3
        return "bobby"
    
    if __name__ == "__main__":
        gen = gen_func()
        print(next(gen))
        gen.throw(Exception, "download error")
        print(next(gen))
    # ---
    http://projectsedu.com
    3
    
    
        gen.throw(Exception, "download error")
    # ---
    ---------------------------------------------------------------------------
    Exception                                 Traceback (most recent call last)
    <ipython-input-10-08e213416358> in <module>
    ----> 1 gen.throw(Exception, "download error")
    
    <ipython-input-7-bc909182a9a4> in gen_func()
          6         pass
          7     yield 2
    ----> 8     yield 3
          9     return "bobby"
         10
    
    Exception: download error
    gen_throw

    12.7 生成器进阶-yield from

    # python3.3新加了yield from语法
    from itertools import chain
    
    my_list = [1, 2, 3]
    my_dict = {
        "lewen1": "http://projectsedu.com",
        "lewen2": "http://www.imooc.com",
    }
    
    for value in chain(my_list, my_dict, range(5, 10)):
        print(value)
    
    
    # """
    1
    2
    3
    lewen1
    lewen2
    5
    6
    7
    8
    9
    
    
    def my_chain(*args, **kwargs):
        for my_iterable in args:
            for value in my_iterable:
                yield value
    
    for value in my_chain(my_list, my_dict, range(5, 10)):
        print(value)
    # ---
    1
    2
    3
    lewen1
    lewen2
    5
    6
    7
    8
    9
    
    def my_chain(*args, **kwargs):
        for my_iterable in args:
            yield from my_iterable
    for value in my_chain(my_list, my_dict, range(5, 10)):
        print(value)
    # ---
    1
    2
    3
    lewen1
    lewen2
    5
    6
    7
    8
    9
    chain
    def g1(iterable):
        yield iterable
    
    def g2(iterable):
        yield from iterable
    
    for value in g1(range(10)):
        print(value)
    for value in g2(range(10)):
        print(value)
    
    
    # """"""
    range(0, 10)
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    yield from iterable
    def g1(gen):
        yield from gen
    
    
    def main():
        g = g1()
        g.send(None)
    
    # 1. main 调用方 g1(委托生成器) gen 子生成器
    # 1. yield from会在调用方与子生成器之间建立一个双向通道
    
    final_result = {}
    def middle(key):
        while True:
            final_result[key] = yield from sales_sum(key)
            print(key+"销量统计完成!!.")
    def sales_sum(pro_name):
        total = 0
        nums = []
        while True:
            x = yield
            print(pro_name+"销量: ", x)
            if not x:
                break
            total += x
            nums.append(x)
        return total, nums
    
    def main():
        data_sets = {
            "lewen牌面膜": [1200, 1500, 3000],
            "lewen牌手机": [28,55,98,108 ],
            "lewen牌大衣": [280,560,778,70],
        }
        for key, data_set in data_sets.items():
            print("start key:", key)
            m = middle(key)
            m.send(None) # 预激middle协程
            for value in data_set:
                m.send(value)   # 给协程传递每一组的值
            m.send(None)
        print("final_result:", final_result)
    
    if __name__ == '__main__':
        main()
    
    # """"""
    start key: lewen牌面膜
    lewen牌面膜销量:  1200
    lewen牌面膜销量:  1500
    lewen牌面膜销量:  3000
    lewen牌面膜销量:  None
    lewen牌面膜销量统计完成!!.
    start key: lewen牌手机
    lewen牌手机销量:  28
    lewen牌手机销量:  55
    lewen牌手机销量:  98
    lewen牌手机销量:  108
    lewen牌手机销量:  None
    lewen牌手机销量统计完成!!.
    start key: lewen牌大衣
    lewen牌大衣销量:  280
    lewen牌大衣销量:  560
    lewen牌大衣销量:  778
    lewen牌大衣销量:  70
    lewen牌大衣销量:  None
    lewen牌大衣销量统计完成!!.
    final_result: {'lewen牌面膜': (5700, [1200, 1500, 3000]), 'lewen牌手机': (289, [28, 55, 98, 108]), 'lewen牌大衣': (1688, [280, 560, 778, 70])}
     """
    
    
    def sales_sum(pro_name):
        total = 0
        nums = []
        while True:
            x = yield  # 接受值
            print(pro_name+"销量: ", x)
            if not x:
                break
            total += x
            nums.append(x)
        return total, nums
    
    if __name__ == "__main__":
        my_gen = sales_sum("bobby牌手机")
        my_gen.send(None)
        my_gen.send(1200)
        my_gen.send(1500)
        my_gen.send(3000)
        try:
            my_gen.send(None)          # 如果将代码合并到middle,出现异常就需要自己去捕捉,
        except StopIteration as e:     # 用 yield from ,就不用去 try 捕捉
            result = e.value
            print(result)
    
    # """"""
    bobby牌手机销量:  1200
    bobby牌手机销量:  1500
    bobby牌手机销量:  3000
    bobby牌手机销量:  None
    (5700, [1200, 1500, 3000])
    yield from 例子
    #pep380
    
    #1. RESULT = yield from EXPR可以简化成下面这样
    #一些说明
    """
    _i:子生成器,同时也是一个迭代器
    _y:子生成器生产的值
    _r:yield from 表达式最终的值
    _s:调用方通过send()发送的值
    _e:异常对象
    
    """
    
    _i = iter(EXPR)      # EXPR是一个可迭代对象,_i其实是子生成器;
    try:
        _y = next(_i)   # 预激子生成器,把产出的第一个值存在_y中;
    except StopIteration as _e:
        _r = _e.value   # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
    else:
        while 1:    # 尝试执行这个循环,委托生成器会阻塞;
            _s = yield _y   # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
            try:
                _y = _i.send(_s)    # 转发_s,并且尝试向下执行;
            except StopIteration as _e:
                _r = _e.value       # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
                break
    RESULT = _r     # _r就是整个yield from表达式返回的值。
    
    """
    1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法;
    2. 如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常;
    3. 调用方让子生成器自己抛出异常
    4. 当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法;
    """
    _i = iter(EXPR)
    try:
        _y = next(_i)
    except StopIteration as _e:
        _r = _e.value
    else:
        while 1:
            try:
                _s = yield _y
            except GeneratorExit as _e:
                try:
                    _m = _i.close
                except AttributeError:
                    pass
                else:
                    _m()
                raise _e
            except BaseException as _e:
                _x = sys.exc_info()
                try:
                    _m = _i.throw
                except AttributeError:
                    raise _e
                else:
                    try:
                        _y = _m(*_x)
                    except StopIteration as _e:
                        _r = _e.value
                        break
            else:
                try:
                    if _s is None:
                        _y = next(_i)
                    else:
                        _y = _i.send(_s)
                except StopIteration as _e:
                    _r = _e.value
                    break
    RESULT = _r
    
    """
    看完代码,我们总结一下关键点:
    
    1. 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;
    2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
    3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
    4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡";
    5. 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡";
    6. 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出GeneratorExit异常。
    
    """
    
    yield from 解析

    12.8 async和await

    async def downloader(url):
        return "lewen"
    async def download_url(url):
        # dosomethings
        html = await downloader(url)
        return html
    if __name__ == "__main__":
        coro = download_url("http://www.imooc.com")
        # next(None)    # 不能这样调用
        coro.send(None)
    
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-3-879770ebad5e> in <module>
          7 if __name__ == "__main__":
          8     coro = download_url("http://www.imooc.com")
    ----> 9     coro.send(None)
         10
    
    StopIteration: lewen
    
    
    
    
    用yield 可以实现 生成器和协程,但容易混淆,就引入了await关键字
    
    import types
    
    @types.coroutine
    def downloader(url):
        yield "lewen"
    
    
    async def download_url(url):
        # dosomethings
        html = await downloader(url)
        return html
    
    
    if __name__ == "__main__":
        coro = download_url("http://www.imooc.com")
        # next(None)    # 不能这样调用
        coro.send(None)

    12-9 生成器实现协程

    # 生成器是可以暂停的函数
    import inspect
    
    
    def gen_func():
        yield 1
    
        # value = yield from
        # 第一返回值给调用方, 第二调用方通过send方式返回值给gen
        return "lewen"
    
    
    # 1. 用同步的方式编写异步的代码, 在适当的时候暂停函数并在适当的时候启动函数
    
    
    if __name__ == "__main__":
        gen = gen_func()
        print(inspect.getgeneratorstate(gen))
        next(gen)
        print(inspect.getgeneratorstate(gen))
        try:
            next(gen)
        except StopIteration:
            pass
    
        print(inspect.getgeneratorstate(gen))
    
        """
        GEN_CREATED
        GEN_SUSPENDED
        GEN_CLOSED
    
        """
    
    gen 状态
    import socket
    
    
    def get_socket_data():
        yield "lewen"
    
    
    def downloader(url):
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.setblocking(False)
    
        try:
            client.connect((host, 80))  # 阻塞不会消耗cpu
        except BlockingIOError as e:
            pass
    
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
        source = yield from get_socket_data()
    
        data = source.decode("utf8")
        html_data = data.split("
    
    ")[1]
        print(html_data)
    
    
    def download_html(html):
        html = yield from downloader()
    
    
    if __name__ == "__main__":
        # 协程的调度依然是 事件循环+协程模式 ,协程是单线程模式
        pass
    yield 实现协程

    -

  • 相关阅读:
    管理这门技术和艺术
    主流双核处理器对比
    20111226自然醒
    message_t
    安装 SQL Server 2008 Management Studio Express
    Linux Ext2/Ext3/Ext4 文件系统分区
    多版本火狐共存方案及火狐配置文档高级管理技巧[转自火狐社区]
    心理学,慢慢学
    system pause in C#
    play with js
  • 原文地址:https://www.cnblogs.com/wenyule/p/10416440.html
Copyright © 2011-2022 走看看