zoukankan      html  css  js  c++  java
  • 协程详解(三)

    好了,准备知识差不多了,接下来就进入正题了,开始讲解协程方面的知识了
    我们先看一段简单的伪代码,这个伪代码已经包含了协程的基本结构和原理
    import socket
    from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
    
    selector = DefaultSelector()
    stopped = False
    r0 = request(0, "/1")
    r1 = request(1,  "/2")
     
    t_list = [r0 ,r1]
     
    r0.send(None)
    r1.send(None)
     
    def request(index, address):
        sock = socket()
        sock.connect(address)
        def on_read():
            print(sock.recv())
            t_list[index].send(None)
    
        selector.register(sock.fileno(), EVENT_READ, on_read)
        yield  # 遇到io,将运行的权利交出去
    
        selector.unregister(sock.fileno())
        print('完成了')
    
    while True:
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    

      

    代码分析:
    1.程序一开始就注册里两个生成器,r0和r1,并且保存在t_list 这个数组里面
    并且进行了第一次执行
    2.request:
    首先定义了一个socket, 并且连接到传递进来的address
    并且注册了一个读的事件的回调,并且yield出来,跳出生成器,等待下一次执行

    既然已经yield出来,那么生成器下一次执行怎么办,到底需要怎么执行
    所以这里的回调就有些意思了,当数据就绪之后,会执行回调
    t_list[index].send(None) ,
    重新获取到本身这个生成器的引用,然后重新执行一次
    等于是当自己的数据已经就绪好了,就会执行自己的回调,自己的回调就是重新调用一下自己,继续激活自己本身这个生成器

    3. events = selector.select()当收到读的事件,然后遍历所有的事件,提取出它的回调,
    callback = event_key.data
    callback()  并且执行回调


    所以协程的基本原理就是:在遇到 io、阻塞的时候,将运行的权利交出去(例如上面的yield),同时注册唤醒自己的回调;
    当阻塞事件完成的时候,通过一个回调来唤醒程序继续往下走,并且返回io事件的值。这句话记下来,考试要考!!!!

    接下来,我们根据上述的思路更进一步,以一个爬虫的代码来更进一步了解一些协程的设计原理:
    class Future:
        def __init__(self):
            self.result = None
            self._callbacks = []
    
        def add_done_callback(self, fn):
            self._callbacks.append(fn)
    
        def set_result(self, result):
            self.result = result
            for fn in self._callbacks:
                fn(self)
    
        def __iter__(self):
            """
            yield的出现使得__iter__函数变成一个生成器,生成器本身就有next方法,所以不需要额外实现。
            yield from x语句首先调用iter(x)获取一个迭代器(生成器也是迭代器)
            """
            yield self  # 外面使用yield from把f实例本身返回
            return self.result  # 在Task.step中send(result)的时候再次调用这个生成器,但是此时会抛出stopInteration异常,并且把self.result返回

    我们先定义一个future类, 这个类是协程的一个基础了,future的一个作用是什么,是用来存储生成器的值(
    result这个变量就是用来存返回值,当协程执行完毕,会调用set_result方法设置返回值,并且执行future设置的回调函数)
    ,并且作为一个协程回调的管理者,
    这里的内容非常简单,只有一个set_result方法以及add_done_callback(添加回调)
    def connect(sock, address):
        f = Future()
        sock.setblocking(False)
        try:
            sock.connect(address)
        except BlockingIOError:
            pass
    
        def on_connected():
            f.set_result(None)
    
        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield from f
        selector.unregister(sock.fileno())
    这个是一个连接地址的方法,内容非常简单,我们重点关注一下future的使用
    这里面首先定义了一个future,然后注册了socket的写的回调,回调的内容是给future 执行以下 set_result, 这个future没啥内容,所以基本上是不执行什么代码

    yield from f 这句代码就执行结果是执行了future 的__iter__函数的yield self 这一句代码,也就是说,代码yield了一个futrue对象
    def read(sock):
        f = Future()
    
        def on_readable():
            f.set_result(sock.recv(4096))
    
        selector.register(sock.fileno(), EVENT_READ, on_readable)
        """
        此处的chunck接收的是f中return的f.result,同时会跑出一个stopIteration的异常,只不过被yield from处理了。
        这里也可直接写成chunck = yiled f
        """
        chunck = yield from f
        selector.unregister(sock.fileno())
        return chunck

    接下来定了一个读取数据的方法,这个方法也比较简单,首先定义了一个future, 并且注册了socket的读方法的回调函数,这个回调函数是的作用是执行futrue的set_result 并且把数据当做参数传递进去,
    然后yield from f 同样yield了这个future对象,这里因为传递了参数进去,所以当第二次执行read这个生成器的时候,就会执行future的return self.result 这句代码,
    因此就等于chunck就获取到了future的result,也就是说,chunck就获取到了socket读取的数据,然后return chunck 返回给调用方。
      def fetch(url):
            sock = socket.socket()
            yield from connect(sock, ("xkcd.com", 80))
            get = "GET {0} HTTP/1.0
    Host:xkcd.com
    
    ".format(url)
            sock.send(get.encode('ascii'))
            response = yield from read(sock)
            print(response)
    
    接下来定义一个完整的根据url,获取请求数据的代码,首先yield from connect,然后sock.send(get.encode('ascii')),
    当数据写完后,再执行response = yield from read(sock) 获取到response,并且打印出来

    class Task:
        def __init__(self, coro):
            self.coro = coro
            f = Future()
            f.set_result(None)
            self.step(f)  # 激活Task包裹的生成器
    
        def step(self, future):
            try:
                # next_future = self.coro.send(future.result)
                next_future = self.coro.send(None)  # 驱动future
            except StopIteration:
                return
    
            next_future.add_done_callback(self.step)

    然后定义一个task类,这个task类主要的作用有两个
    1. 启动以及唤醒协程
    2. 把对协程进行管理,把协程的启动函数send方法,添加到future的回调函数列表里面
    那么当socket的回调函数执行set_result,set_result除了会设置result之外,也会执行future的回调函数列表里面的回调函数
    当我们把协程的启动函数send放到future的回调的话,当set_result的时候就会重新唤醒协程,继续执行

    def loop():
        while :
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    这是一个事件循环的雏形,跟上一篇文章一样,就是不断地监控select所注册的socket,假如有socket就绪了,就把就绪的socket的回调取出来,然后执行
    import socket
    from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
    
    selector = DefaultSelector()
    stopped = False
    urls_todo = {"/", "/1", "/2", "/3"}
    if __name__ == "__main__":
        import time
        start = time.time()
        for url in urls_todo:
            Task(fetch(url))
        loop()
        print(time.time() - start)
    然后这是入口函数,我们来一句一句的分析一下这个整个代码执行了什么
    1. 首先我们初始化了三个生成器fetch(url),并且传递到task,创建了三个task
    并且三个task都执行了next_future = self.coro.send(None),开始了生成器的第一次执行
    2. 生成器第一次执行的位置是yield from connect(sock, ("xkcd.com", 80)) --> connect函数里面的
        f = Future()
        sock.setblocking(False)
        try:
            sock.connect(address)
        except BlockingIOError:
            pass
    
        def on_connected():
            f.set_result(None)
    
        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield from f
    进行了socket的连接,以及写事件的回调注册,这里的回调注册仅仅是把future给set_result
    然后yield from f会执行future里面的_iter_函数并且yield自身,也就是执行到这里
    函数yield出来了一个future对象
    3.next_future = self.coro.send(None) 所以这一句获取的值就是一个future对象,
    next_future.add_done_callback(self.step) 就给这个future对象分别设置了task.step添加到future的回调函数列表
    也就说说 三个生成器分别产生了三个future对象,同时三个future都把对应的生成器的启动函数放到了future的回调函数列表

    4. 然后代码执行到loop()这里,loop会进行select操作
    当三个socket分别都连接完毕之后,就会返回对应的socket对象,然后分别把socket的回调函数取出来,进行执行
    三个回调函数的内容都是给 f.set_result(None), future执行完set_result后,会执行
    for fn in self._callbacks:
                fn(self)
    回调列表里面的所有的回调函数
    三个future的回调函数前面也说了,就是task的self.step 方法,也就说生成器的send方法,也是分别对生成器进行执行

    5. 然后生成器进行执行,到下面这里,进行url请求,并且执行read函数,
    get = "GET {0} HTTP/1.0
    Host:xkcd.com
    
    ".format(url)
    sock.send(get.encode('ascii'))
    response = yield from read(sock)
    进而执行到read函数里面,新建了future,并且注册socket的读取回调,回调函数是给future set_result 读取到的数据
    f = Future()
    def on_readable():
       f.set_result(sock.recv(4096))
    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunck = yield from f
    跟前面步骤一样,然后yield出来一个future对象

    6. 接下来继续执行step下面的代码,继续把step注册到future的回调函数列表里面
    next_future.add_done_callback(self.step)
    然后三个future都执行了相同的步骤
    7. 执行到这里就继续进行loop函数的循环,继续select操作,当socket接收到数据之后,然后取出socket,执行socket的回调函数on_readable 里面的f.set_result(sock.recv(4096))
    future保存了读取到的数到result变量里面,然后执行future回调列表里面的函数,回调函数也是继续激活生成器,执行的代码就是future的return self.result
    就说read函数里面的
    chunck = yield from f  把future结果赋值给了chunck
    selector.unregister(sock.fileno())
    return chunck   返回了chunck

    然后继续返回到了fetch函数的
    response = yield from read(sock)
    print(response)

    把读取出来的结果打印出来了,至此,整个程序也就执行完毕了,三个生成器分别完成了url的连接,数据读取的完整步骤

    以上就是协程的基本流程了,python的asyncio的结构也是差不多,执行逻辑也是差不多,只是async在这样的逻辑基础上进行了更加复杂的功能,做了很多优化,上面的各个函数都能够对应得了asyncio的各个class
    task对应着 asyncio的 Task 类
    loop对应着 asyncio的事件循环
    future对应着 asyncio的Future 类
    fetch 对应着 asyncio的协程
    至于asyncio后续找时间对它的源码分析一下,是完全可以跟上面的例子对应的上的




      



  • 相关阅读:
    python移动文件,将一个文件夹里面的文件移动到另一个文件夹
    python中的os.path.join, os.path.splitext, os.path.split, split()函数用法
    python遍历目录下的所有目录和文件, python解析json文件, python-opencv截取子图
    Python对文件进行批量重命名
    python遍历目录下的所有目录和文件,并用opencv从mp4文件中抽帧得到图片
    python中的os.walk()方法学习
    ubuntu16.04 安装caffe时出现 .build_release/tools/caffe: error while loading shared libraries: libcudart.so.10.0: cannot open shared object file: No such file or directory
    caffe编译过程中的错误: nvcc fatal : Unsupported gpu architecture 'compute_20'
    Java字符串为""和null的区别
    一次docker中的nginx进程响应慢问题定位记录
  • 原文地址:https://www.cnblogs.com/wilken/p/14233000.html
Copyright © 2011-2022 走看看