最近一个 python 项目中同时用到了 gevent 和 multiprocessing。在优雅退出的实现上,出现了一些预料之外的问题。
一个简化版的代码,启动了4 个进程,每个进程里启动了两个协程,并注册了 SIGINT 等信号的回调函数来实现优雅退出:
import signal
import time
import multiprocessing
import gevent
from gevent import monkey
monkey.patch_all() # NOQA
class WorkerManager():
def __init__(self):
self.is_running = multiprocessing.Value('b', True)
def job(self):
while self.is_running.value:
print("job")
time.sleep(3)
def run(self):
for sig in [signal.SIGINT, signal.SIGUSR1, signal.SIGTERM]:
signal.signal(sig, signal.SIG_IGN)
jobs = [gevent.spawn(self.job) for _ in range(2)]
gevent.joinall(jobs)
def start(self):
self.workers = [multiprocessing.Process(
target=self.run) for _ in range(4)]
for worker in self.workers:
worker.start()
signal.signal(signal.SIGINT, self.graceful_exit)
def graceful_exit(self, sig, frame):
self.shutdown()
def shutdown(self):
if not self.is_running.value:
return
self.is_running.value = False
for worker in self.workers:
worker.join()
worker_manager = WorkerManager()
worker_manager.start()
上面的代码运行后,按ctrl+c会报下面的错误:
gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback
相关的调用栈
File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
pid, sts = os.waitpid(self.pid, flag)
File ".../venv/lib/python3.7/site-packages/gevent/os.py", line 380, in waitpid
get_hub().wait(new_watcher)
File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
背景知识
-
信号处理的原理是操作系统会把信号发给进程和该进程的子进程,每个进程原来的逻辑就中断了,然后调用我们注册的信号回调函数来处理。如果这个进程里有多个线程,那么此刻跑的是哪个线程,就从哪个线程中断。
-
gevent 是一个流行的 python 网络库,主要的功能就是在 python 中提供了一些事件循环的接口。它是基于 greenlet 实现的。greenlet 也可以理解为协程,就像 golang 里的 goroutine。
-
greenlet 的功能就是提供了在不同调用栈之间切换(switch)的能力。比如一会执行这个协程,然后它要阻塞等待一些 IO 操作,那就主动切换到另一个协程的调用栈去执行另一个协程。而 gevent 就对 greenlet 进行了一层封装,我们只用调用 gevent.spawn() 就可以创建并运行协程,gevent 会帮我们调度。gevent 还封装了一些操作系统自带的函数,比如 sleep。
-
每个 greenlet 都会在一个线程上,一个线程上可以有多个 greenlet,但一次只有一个 greenlet 在运行。
-
对于每个协程,都需要在一个 hub 里运行,hub 被翻译为集线器,hub 也是一个 greenlet,为什么又要搞个 greenlet 呢,因为它是帮我们做切换调用栈的家伙。
-
hub 里运行着事件循环(loop),什么是事件循环呢?就是说操作系统会发出事件通知你的程序,比如一个 socket 可以读了,你的程序就可以做相应处理。这种注册事件、等待着并在事件发生时做处理的流程就是事件循环。gevent 是基于 libev 这个库实现事件循环的。
-
当我们调用 spawn 时,会创建一个新的 greenlet,并在 hub 里注册事件,事件循环收到事件通知时,就会调用我们的回调函数。而如果回调函数里有一个 sleep 之类的阻塞事件,gevent 的实现中就会进行 switch 操作,也就是切到 hub,等阻塞操作完成,就又会从 hub 里切换回来。
-
调用 join 或 joinall 时,就会切换到 hub 里,会启动事件轮询来等待协程结束。
原因
回到我们的代码里,我们用了gevent 的 monkey.patch_all()
,并且用到了 multiprocessing,而出错的调用栈中可以看到问题出在对子进程 join 时,这个 join 函数在 multiprocess 库里,调用了 os.waitpid
,这里就会调用 gevent 实现的 os,由于是个阻塞操作,就会在 switch_out 时出错。为什么出错呢?这是 gevent 里相关的代码:
class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
# Subclasses must define:
# - self.loop
# This class defines loop in its .pxd for Cython. This lets us avoid
# circular dependencies with the hub.
def switch(self):
switch_out = getattr(getcurrent(), 'switch_out', None) # pylint:disable=undefined-variable
if switch_out is not None:
switch_out()
return _greenlet_switch(self) # pylint:disable=undefined-variable
def switch_out(self):
raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
因为我们的程序收到信号中断时,主进程里没有其他的 greenlet,主进程里也没有其它运行的东西,所以运行着的是 hub 本身这个 greenlet,它会在一个线程里运行。所以 switch_out 时会找之前在跑的 greenlet(getcurrent()这个代码),结果就是 hub 本身。
一般 switch_out 是用来从一个普通的 greenlet 切换到 hub 里的,现在从 hub 里无法再切换到其它地方了。所以就是‘BlockingSwitchOutError’ 错误了。
参考:
- github - Can not join thread in signal handler after monkey patching #799
- CSDN 博客 - gevent: AssertionError: Impossible to call blocking function in the event loop callback
- github - Gracefully shutdown server on SIGTERM from docker or kubernetes #1817
解决方法
- 既然是 hub 里无法切出去,那我们可以把 shutdown 放到一个 greenlet 里:
def graceful_exit(self, sig, frame):
gevent.spawn(self.shutdown)
但如果主进程了没别的在跑,可能不会等 shutdown 运行完。
ps:一不小心写成了 self.shutdown()
,后面加上了(),就和没改一样,所以报了一样的错了。
- 也可以不让 gevent 影响 multiprocess 里的 os 函数
monkey.patch_all(os=False)
- 还可以使用 gevent 提供的 signal 处理函数,它会在一个新的 greenlet 里运行。注意 monkey patch 不会修补默认的 signal.signal 函数。
gevent.signal_handler(signal.SIGINT, self.graceful_exit, signal.SIGINT, None)
这种方法同样的可能不会等 shutdown 运行完。。
- 让代码最后加上这段也可以。因为这样,主进程就忙着跑 while True 了,而没有切到 hub 所在的线程。 但是占 cpu 资源。
while True:
pass
- 可以将 join 的代码拿出来,在主线程里调用,不要放到回调函数里,不然会被 hub 线程运行。
def join(self):
for worker in self.worker_process:
worker.join()
worker.close()
def shutdown(self):
if not self.is_running.value:
return
self.is_running.value = False
...
worker_manager.join()