zoukankan      html  css  js  c++  java
  • Python线程超时控制

    # #!/usr/bin/env python
    # # coding: utf-8
    # # https://www.cnblogs.com/scolia/p/6132950.html
    import threading
    import time
    import signal
    
    def set_timeout(num):
        def wrap(func):
            def handle(signum, frame):
                signal.alarm(0)
                raise RuntimeError
    
            def todo(self, *args, **kwargs):
                try:
                    print(f"in todo self:{self}")
                    signal.signal(signal.SIGALRM, handle)
                    signal.alarm(num)  # 设置定时器
                    print("设置定时器完成")
                    r = func(self, *args, **kwargs)
                    # signal.alarm(0)  # signal.alarm(0)  # 关闭定时器, 这里不能阻塞
                    # print("关闭定时器")
                    # return r
                except RuntimeError:
                    print(f"signal超时,准备停止线程")
                    self.coo.tmp_thread.stop() # 是否真的调用了stop/
                    print(f"signal超时,停止线程 done")
                    raise Exception
            return todo
        return wrap
    
    
    class Job(threading.Thread):
    
        def __init__(self, *args, **kwargs):
            super(Job, self).__init__(*args, **kwargs)
            self.__running = threading.Event()      # 用于停止线程的标识
            self.__running.set()      # 将running设置为True
            # self.running = True
    
        def run(self):
            while True:
                if self.__running.isSet():
                    print("子线程运行中... ", time.time())
                    time.sleep(1)
                else:
                    print("子线程退出....")
                    print("子线程退出....")
                    return
    
                print("in while True")
    
    
        def stop(self):
            print("调用线程stop")
            time.sleep(1)
            self.__running.clear()        # 设置为False
            # self.running = False
            print("调用线程stop done")
            time.sleep(1)
    
    
    # 线程的停止:注意如果是阻塞的无法完成stop
    class Coo():
    
        def __init__(self):
            self.tmp_thread = None
    
        def execute(self):
            t = Job()
            t.setDaemon(True)
            t.start()
            self.tmp_thread = t
            t.join()
            # time.sleep(8)
            # t.stop()
    
    class CustErr(Exception):
        pass
    
    class Foo():
        def __init__(self):
            self.coo = Coo()
    
        # def handle(self, signum, frame):
        #     signal.alarm(0)
        #     # self.coo.tmp_thread.stop()
        #     raise CustErr
    
        @set_timeout(num=3)
        def foo_main(self):
            self.coo.execute()
            # try:
            #     signal.signal(signal.SIGALRM, self.handle)
            #     signal.alarm(3)
            #     print(f"in foo main self:{self}")
            #     self.coo.execute()
            # except CustErr:
            #     print("catch Cust Err...")
            #     self.coo.tmp_thread.stop()
    
    def main():
        a = Foo()
        a.foo_main()
    
    from multiprocessing import Process
    p1=Process(target=main,args=())
    p1.start()
    
    print("主线程结束退出")
  • 相关阅读:
    Silverlight Tips(2)
    Sl4程序部署至IIS7
    Silverlight Tips(1)
    Silverlight中使用MVVM(5):Command II
    Linq操作之Except,Distinct,Left Join
    Ubuntu搭建Django+Flup+Nginx环境
    ASP.NET 4.0 与 Entity Framework 4第四篇Entity Framework在三层架构中的使用
    玩转博客园的5个小技巧
    Linux下安装Django1.2和MysqlPython
    Linux下如何给Subversion和Mercurial设置HTTP代理
  • 原文地址:https://www.cnblogs.com/yum777/p/14250140.html
Copyright © 2011-2022 走看看