zoukankan      html  css  js  c++  java
  • gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

    最近一个 python 项目中同时用到了 gevent 和 multiprocessing。在优雅退出的实现上,出现了一些预料之外的问题。

    一个简化版的代码,启动了4 个进程,每个进程里启动了两个协程,并注册了 SIGINT 等信号的回调函数来实现优雅退出:

    import signal
    import time
    import multiprocessing
    import gevent
    from gevent import monkey
    monkey.patch_all()  # NOQA
    
    
    class WorkerManager():
        def __init__(self):
            self.is_running = multiprocessing.Value('b', True)
    
        def job(self):
            while self.is_running.value:
                print("job")
                time.sleep(3)
    
        def run(self):
            for sig in [signal.SIGINT, signal.SIGUSR1, signal.SIGTERM]:
                signal.signal(sig, signal.SIG_IGN)
    
            jobs = [gevent.spawn(self.job) for _ in range(2)]
            gevent.joinall(jobs)
    
        def start(self):
            self.workers = [multiprocessing.Process(
                target=self.run) for _ in range(4)]
            for worker in self.workers:
                worker.start()
            signal.signal(signal.SIGINT, self.graceful_exit)
    
        def graceful_exit(self, sig, frame):
            self.shutdown()
     
        def shutdown(self):
            if not self.is_running.value:
                return
            self.is_running.value = False
            for worker in self.workers:
                worker.join()
            
    
    worker_manager = WorkerManager()
    worker_manager.start()
    

    上面的代码运行后,按ctrl+c会报下面的错误:

    gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback
    

    相关的调用栈

      File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
        pid, sts = os.waitpid(self.pid, flag)
      File ".../venv/lib/python3.7/site-packages/gevent/os.py", line 380, in waitpid
        get_hub().wait(new_watcher)
      File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
      File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
      File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
      File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
      File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
      File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
      File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
      File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
    

    背景知识

    • 信号处理的原理是操作系统会把信号发给进程和该进程的子进程,每个进程原来的逻辑就中断了,然后调用我们注册的信号回调函数来处理。如果这个进程里有多个线程,那么此刻跑的是哪个线程,就从哪个线程中断。

    • gevent 是一个流行的 python 网络库,主要的功能就是在 python 中提供了一些事件循环的接口。它是基于 greenlet 实现的。greenlet 也可以理解为协程,就像 golang 里的 goroutine。

    • greenlet 的功能就是提供了在不同调用栈之间切换(switch)的能力。比如一会执行这个协程,然后它要阻塞等待一些 IO 操作,那就主动切换到另一个协程的调用栈去执行另一个协程。而 gevent 就对 greenlet 进行了一层封装,我们只用调用 gevent.spawn() 就可以创建并运行协程,gevent 会帮我们调度。gevent 还封装了一些操作系统自带的函数,比如 sleep。

    • 每个 greenlet 都会在一个线程上,一个线程上可以有多个 greenlet,但一次只有一个 greenlet 在运行。

    • 对于每个协程,都需要在一个 hub 里运行,hub 被翻译为集线器,hub 也是一个 greenlet,为什么又要搞个 greenlet 呢,因为它是帮我们做切换调用栈的家伙。

    • hub 里运行着事件循环(loop),什么是事件循环呢?就是说操作系统会发出事件通知你的程序,比如一个 socket 可以读了,你的程序就可以做相应处理。这种注册事件、等待着并在事件发生时做处理的流程就是事件循环。gevent 是基于 libev 这个库实现事件循环的。

    • 当我们调用 spawn 时,会创建一个新的 greenlet,并在 hub 里注册事件,事件循环收到事件通知时,就会调用我们的回调函数。而如果回调函数里有一个 sleep 之类的阻塞事件,gevent 的实现中就会进行 switch 操作,也就是切到 hub,等阻塞操作完成,就又会从 hub 里切换回来。

    • 调用 join 或 joinall 时,就会切换到 hub 里,会启动事件轮询来等待协程结束。

    原因

    回到我们的代码里,我们用了gevent 的 monkey.patch_all(),并且用到了 multiprocessing,而出错的调用栈中可以看到问题出在对子进程 join 时,这个 join 函数在 multiprocess 库里,调用了 os.waitpid,这里就会调用 gevent 实现的 os,由于是个阻塞操作,就会在 switch_out 时出错。为什么出错呢?这是 gevent 里相关的代码:

    class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
        # Subclasses must define:
        # - self.loop
    
        # This class defines loop in its .pxd for Cython. This lets us avoid
        # circular dependencies with the hub.
    
        def switch(self):
            switch_out = getattr(getcurrent(), 'switch_out', None) # pylint:disable=undefined-variable
            if switch_out is not None:
                switch_out()
            return _greenlet_switch(self) # pylint:disable=undefined-variable
    
        def switch_out(self):
            raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
    
    

    因为我们的程序收到信号中断时,主进程里没有其他的 greenlet,主进程里也没有其它运行的东西,所以运行着的是 hub 本身这个 greenlet,它会在一个线程里运行。所以 switch_out 时会找之前在跑的 greenlet(getcurrent()这个代码),结果就是 hub 本身。

    一般 switch_out 是用来从一个普通的 greenlet 切换到 hub 里的,现在从 hub 里无法再切换到其它地方了。所以就是‘BlockingSwitchOutError’ 错误了。

    参考:

    解决方法

    1. 既然是 hub 里无法切出去,那我们可以把 shutdown 放到一个 greenlet 里:
        def graceful_exit(self, sig, frame):
            gevent.spawn(self.shutdown)
    

    但如果主进程了没别的在跑,可能不会等 shutdown 运行完。

    ps:一不小心写成了 self.shutdown(),后面加上了(),就和没改一样,所以报了一样的错了。

    1. 也可以不让 gevent 影响 multiprocess 里的 os 函数
    monkey.patch_all(os=False)
    
    1. 还可以使用 gevent 提供的 signal 处理函数,它会在一个新的 greenlet 里运行。注意 monkey patch 不会修补默认的 signal.signal 函数。
    gevent.signal_handler(signal.SIGINT, self.graceful_exit, signal.SIGINT, None)
    

    这种方法同样的可能不会等 shutdown 运行完。。

    1. 让代码最后加上这段也可以。因为这样,主进程就忙着跑 while True 了,而没有切到 hub 所在的线程。 但是占 cpu 资源。
    while True:
        pass
    
    1. 可以将 join 的代码拿出来,在主线程里调用,不要放到回调函数里,不然会被 hub 线程运行。
        def join(self):
            for worker in self.worker_process:
                worker.join()
                worker.close()
    
        def shutdown(self):
            if not self.is_running.value:
                return
            self.is_running.value = False
    ...
    
    worker_manager.join()
    

    ┆凉┆暖┆降┆等┆幸┆我┆我┆里┆将┆ ┆可┆有┆谦┆戮┆那┆ ┆大┆始┆ ┆然┆
    ┆薄┆一┆临┆你┆的┆还┆没┆ ┆来┆ ┆是┆来┆逊┆没┆些┆ ┆雁┆终┆ ┆而┆
    ┆ ┆暖┆ ┆如┆地┆站┆有┆ ┆也┆ ┆我┆ ┆的┆有┆精┆ ┆也┆没┆ ┆你┆
    ┆ ┆这┆ ┆试┆方┆在┆逃┆ ┆会┆ ┆在┆ ┆清┆来┆准┆ ┆没┆有┆ ┆没┆
    ┆ ┆生┆ ┆探┆ ┆最┆避┆ ┆在┆ ┆这┆ ┆晨┆ ┆的┆ ┆有┆来┆ ┆有┆
    ┆ ┆之┆ ┆般┆ ┆不┆ ┆ ┆这┆ ┆里┆ ┆没┆ ┆杀┆ ┆来┆ ┆ ┆来┆
  • 相关阅读:
    利用Trace.WriteLine定位难以重现的问题
    技术经验分享
    辞职小记
    残阳如血--读《忆秦娥·娄山关》 有感
    一个简单多线程等待窗口
    [转]Control的Invoke和BeginInvoke
    elk 改为使用 ik 中文分词器
    在 jenkins 的 pipeline 中使用分支参数
    centos8 下删除网桥 docker0
    vscode 实现组件之间的跳转
  • 原文地址:https://www.cnblogs.com/flipped/p/15500530.html
Copyright © 2011-2022 走看看