zoukankan      html  css  js  c++  java
  • gevent源码分析:协程切换(上)

    一、背景

    大家都知道gevent的机制是单线程+协程机制,当遇到可能会阻塞的操作时,就切换到可运行的协程中继续运行,以此来实现提交系统运行效率的目标,但是具体是怎么实现的呢?让我们直接从代码中看一下吧。

    二、切换机制

    让我们从socket的send、recv方法入手:

    def recv(self, *args):
        while 1:
            try:
                return self._sock.recv(*args)
            except error as ex:
                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
                    raise
                # QQQ without clearing exc_info test__refcount.test_clean_exit fails
                sys.exc_clear()
            self._wait(self._read_event)
    

    这里会开启一个死循环,在循环中调用self._sock.recv()方法,并捕获异常,当错误是EWOULDBLOCK时,则调用self._wait(self._read_event)方法,该方法其实是:_wait = _wait_on_socket_wait_on_socket方法的定义在文件:_hub_primitives.py中,如下:

    # Suitable to be bound as an instance method
    def wait_on_socket(socket, watcher, timeout_exc=None):
        if socket is None or watcher is None:
            # test__hub TestCloseSocketWhilePolling, on Python 2; Python 3
            # catches the EBADF differently.
            raise ConcurrentObjectUseError("The socket has already been closed by another greenlet")
        _primitive_wait(watcher, socket.timeout,
                        timeout_exc if timeout_exc is not None else _NONE,
                        socket.hub)
    

    该方法其实是调用了函数:_primitive_wait(),其仍然在文件:_hub_primitives.py中定义,如下:

    def _primitive_wait(watcher, timeout, timeout_exc, hub):
        if watcher.callback is not None:
            raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r'
                                           % (watcher.callback, ))
    
        if hub is None:
            hub = get_hub()
    
        if timeout is None:
            hub.wait(watcher)
            return
    
        timeout = Timeout._start_new_or_dummy(
            timeout,
            (timeout_exc
             if timeout_exc is not _NONE or timeout is None
             else _timeout_error('timed out')))
    
        with timeout:
            hub.wait(watcher)
    

    这里其实是调用了hub.wait()函数,该函数的定义在文件_hub.py中,如下:

    class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable
    
        def wait(self, watcher):
            """
            Wait until the *watcher* (which must not be started) is ready.
    
            The current greenlet will be unscheduled during this time.
            """
            waiter = Waiter(self) # pylint:disable=undefined-variable
            watcher.start(waiter.switch, waiter)
            try:
                result = waiter.get()
                if result is not waiter:
                    raise InvalidSwitchError(
                        'Invalid switch into %s: got %r (expected %r; waiting on %r with %r)' % (
                            getcurrent(), # pylint:disable=undefined-variable
                            result,
                            waiter,
                            self,
                            watcher
                        )
                    )
            finally:
                watcher.stop()
    

    该类WaitOperationsGreenlet是Hub的基类,其方法wait中的逻辑是:生成一个Waiter对象,并调用watcher.start(waiter.switch, waiter)方法,watcher是最开始recv方法中使用的self._read_event,watcher是gevent的底层事件框架libev中的概念;同时还有一个waiter对象,它类似与python中的future概念,该对象有一个switch()方法以及get()方法,当没有得到结果没有准备好时,调用waiter.get()方法回导致协程被挂起;get()函数的定义如下:

    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            getcurrent().throw(*self._exception) # pylint:disable=undefined-variable
        else:
            if self.greenlet is not None:
                raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
            self.greenlet = getcurrent() # pylint:disable=undefined-variable
            try:
                return self.hub.switch()
            finally:
                self.greenlet = None
    

    在get()中最关键的是self.hub.switch()函数,该函数将执行权转移到hub,并继续运行,至此已经分析完了当在worker协程中从网络获取数据遇到阻塞时,如何避免阻塞并切换到hub中的实现,至于何时再切换会worker协程,我们后续再继续分析。

    总结

    要记得gevent中一个重要的概念,协程切换不是调用而是执行权的转移,从可能会阻塞的协程切换到hub,并由hub在合适的时机切换到另一个可以继续运行的协程继续执行;gevent通过这种形式实现了提高io密集型应用吞吐率的目标。

  • 相关阅读:
    Nginx 部署多个 web 项目(虚拟主机)
    Nginx 配置文件
    Linux 安装 nginx
    Linux 安装 tomcat
    Linux 安装 Mysql 5.7.23
    Linux 安装 jdk8
    Linux 安装 lrzsz,使用 rz、sz 上传下载文件
    springMVC 拦截器
    spring 事务
    基于Aspectj 注解实现 spring AOP
  • 原文地址:https://www.cnblogs.com/lit10050528/p/13549532.html
Copyright © 2011-2022 走看看