zoukankan      html  css  js  c++  java
  • gevent源码分析:协程切换(上)

    一、背景

    大家都知道gevent的机制是单线程+协程机制,当遇到可能会阻塞的操作时,就切换到可运行的协程中继续运行,以此来实现提交系统运行效率的目标,但是具体是怎么实现的呢?让我们直接从代码中看一下吧。

    二、切换机制

    让我们从socket的send、recv方法入手:

    def recv(self, *args):
        while 1:
            try:
                return self._sock.recv(*args)
            except error as ex:
                if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
                    raise
                # QQQ without clearing exc_info test__refcount.test_clean_exit fails
                sys.exc_clear()
            self._wait(self._read_event)
    

    这里会开启一个死循环,在循环中调用self._sock.recv()方法,并捕获异常,当错误是EWOULDBLOCK时,则调用self._wait(self._read_event)方法,该方法其实是:_wait = _wait_on_socket_wait_on_socket方法的定义在文件:_hub_primitives.py中,如下:

    # Suitable to be bound as an instance method
    def wait_on_socket(socket, watcher, timeout_exc=None):
        if socket is None or watcher is None:
            # test__hub TestCloseSocketWhilePolling, on Python 2; Python 3
            # catches the EBADF differently.
            raise ConcurrentObjectUseError("The socket has already been closed by another greenlet")
        _primitive_wait(watcher, socket.timeout,
                        timeout_exc if timeout_exc is not None else _NONE,
                        socket.hub)
    

    该方法其实是调用了函数:_primitive_wait(),其仍然在文件:_hub_primitives.py中定义,如下:

    def _primitive_wait(watcher, timeout, timeout_exc, hub):
        if watcher.callback is not None:
            raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r'
                                           % (watcher.callback, ))
    
        if hub is None:
            hub = get_hub()
    
        if timeout is None:
            hub.wait(watcher)
            return
    
        timeout = Timeout._start_new_or_dummy(
            timeout,
            (timeout_exc
             if timeout_exc is not _NONE or timeout is None
             else _timeout_error('timed out')))
    
        with timeout:
            hub.wait(watcher)
    

    这里其实是调用了hub.wait()函数,该函数的定义在文件_hub.py中,如下:

    class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable
    
        def wait(self, watcher):
            """
            Wait until the *watcher* (which must not be started) is ready.
    
            The current greenlet will be unscheduled during this time.
            """
            waiter = Waiter(self) # pylint:disable=undefined-variable
            watcher.start(waiter.switch, waiter)
            try:
                result = waiter.get()
                if result is not waiter:
                    raise InvalidSwitchError(
                        'Invalid switch into %s: got %r (expected %r; waiting on %r with %r)' % (
                            getcurrent(), # pylint:disable=undefined-variable
                            result,
                            waiter,
                            self,
                            watcher
                        )
                    )
            finally:
                watcher.stop()
    

    该类WaitOperationsGreenlet是Hub的基类,其方法wait中的逻辑是:生成一个Waiter对象,并调用watcher.start(waiter.switch, waiter)方法,watcher是最开始recv方法中使用的self._read_event,watcher是gevent的底层事件框架libev中的概念;同时还有一个waiter对象,它类似与python中的future概念,该对象有一个switch()方法以及get()方法,当没有得到结果没有准备好时,调用waiter.get()方法回导致协程被挂起;get()函数的定义如下:

    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            getcurrent().throw(*self._exception) # pylint:disable=undefined-variable
        else:
            if self.greenlet is not None:
                raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
            self.greenlet = getcurrent() # pylint:disable=undefined-variable
            try:
                return self.hub.switch()
            finally:
                self.greenlet = None
    

    在get()中最关键的是self.hub.switch()函数,该函数将执行权转移到hub,并继续运行,至此已经分析完了当在worker协程中从网络获取数据遇到阻塞时,如何避免阻塞并切换到hub中的实现,至于何时再切换会worker协程,我们后续再继续分析。

    总结

    要记得gevent中一个重要的概念,协程切换不是调用而是执行权的转移,从可能会阻塞的协程切换到hub,并由hub在合适的时机切换到另一个可以继续运行的协程继续执行;gevent通过这种形式实现了提高io密集型应用吞吐率的目标。

  • 相关阅读:
    没有spring如何使用注解下篇
    在没有spring框架如何使用注解上篇
    oracle11g里sqldeveloper不能打开的问题
    java代码换行
    枚举接口Enumeration
    java开发环境的搭建(上班笔记01)
    2013.12.12-2013.12.22面试
    2013.12.12-2013.12.20面试
    supervisor superlance
    Laravel 返回日期问题2021-07-23T05:56:03.000000Z
  • 原文地址:https://www.cnblogs.com/lit10050528/p/13549532.html
Copyright © 2011-2022 走看看