zoukankan      html  css  js  c++  java
  • asyncio 自动跳出长时间堵塞的 task

    https://www.cnblogs.com/ywhyme/p/10660411.html 的升级版
    可以知道当前是卡在哪一个 task 甚至是多少行

    import asyncio
    import os
    import queue
    import signal
    import time
    import threading
    import logging
    
    # logging.basicConfig(level=logging.DEBUG, format="%(asctime)s  %(filename)s : %(levelname)s  %(message)s", )
    logging.basicConfig(level=logging.DEBUG)
    
    # 魔改部分
    # 魔改原因, 在 loop.debug 为 True 的时候才会给 loop 设置 _current_handle
    from asyncio import events
    
    
    class Handle(events.Handle):
        def _run(self):
            self._loop._current_handle = self
            super()._run()
    
    
    events.Handle = Handle
    # 魔改结束
    
    
    async def factorial(name, number):
        f = 1
        for i in range(2, number + 1):
            print(f"Task {name}: Compute factorial({i})...")
            await asyncio.sleep(1)
            f *= i
        print(f"Task {name}: factorial({number}) = {f}")
    
    
    async def test():
        for i in range(100):
            print("sleep--", i)
            time.sleep(1)
    
    
    def handler(signum, frame):
        print('Signal handler called with signal', signum)
        raise Exception("Kill the task")
    
    
    signal.signal(signal.SIGTERM, handler)
    
    
    async def main():
        await asyncio.gather(
            test(),
            factorial("A", 2),
            factorial("B", 3),
            factorial("C", 4),
            return_exceptions=True
        )
    
    
    def check(co_name, threshold: int = 60) -> bool:
        """连续的记录超过阈值"""
        i = 0
        for item in q:
            if item == co_name:
                i += 1
            else:
                break
        if i >= threshold:
            return True
        else:
            return False
    
    
    def asyncio_monitor(loop, step: int = 1):
        while not stop:
            if hasattr(loop._current_handle, "_callback"):
                callback = loop._current_handle._callback
                task = getattr(callback, "__self__")  # Task
    
                co_name = task._coro.cr_code.co_name  # task._coro.cr_code.co_name # coro name
    
                if check(co_name, 10):
                    # 长时间堵塞, 抛出异常让 task 结束
                    if pid != None:
                        os.system(f"kill -{signal.SIGTERM} {pid}")
    
                if task._state == "PENDING":
                    q.appendleft(co_name)
    
                # info 为一个的回显字符串
    
                info = str(getattr(callback, "__self__", callback))
                print(info)
                #
                # coro = task._coro.co_name # coro name
                # task._state  # futures/_base.py:25
                #
    
            time.sleep(step)
    
    
    def run_asyncio(loop):
        global stop
        # loop.set_debug(True)
        loop.run_until_complete(main())
        stop = True
    
    
    if __name__ == '__main__':
        pid = os.getpid()
        print(pid)
        stop = False
    
        q = queue.deque(maxlen=100)
    
        loop = asyncio.get_event_loop()
    
        t1 = threading.Thread(target=asyncio_monitor, args=(loop,))
        t1.start()
    
        run_asyncio(loop)
    
  • 相关阅读:
    [CF707D]Persistent Bookcase_主席树_bitset
    [CF798D]Mike and distribution_贪心
    [LuoguP2164][SHOI2007]交通网络_拓扑排序_概率期望
    [LuoguP3064][USACO12DEC]伊斯坦布尔的帮派Gangs of Istanbull(加强版)_线段树_贪心
    [CF306C]White, Black and White Again_排列组合
    [LuoguP2167][SDOI2009]Bill的挑战_容斥原理/状压dp
    [LuoguP2163][SHOI2007]园丁的烦恼_CDQ分治
    正则字符串插入字符
    [react]
    react 预览pdf 转换
  • 原文地址:https://www.cnblogs.com/twotigers/p/10731771.html
Copyright © 2011-2022 走看看