zoukankan      html  css  js  c++  java
  • python 超时重试的方法 signal手段

    # -*- coding: utf-8 -*-
    
    import random
    import signal
    import time
    
    from retrying import retry
    
    
    def retry_when_timeout(retries=3, timeout=2):
        def decor(func):
            def timeout_handle(signum, frame):
                raise RetryTimeOutException("Time out...")
    
            @retry(stop_max_attempt_number=retries, retry_on_exception=retry_if_timeout)
            def run(*args, **kwargs):
                signal.signal(signal.SIGALRM, timeout_handle)  # 设置信号和回调函数
                signal.alarm(timeout)  # 设置 num 秒的闹钟
                print('start alarm signal.')
                r = func(*args, **kwargs)
                print('close alarm signal.')
                signal.alarm(0)  # 关闭闹钟
                return r
    
            return run
    
        return decor
    
    
    class RetryTimeOutException(Exception):
        def __init__(self, *args, **kwargs):
            pass
    
    
    def retry_if_timeout(exception):
        """Return True if we should retry (in this case when it's an IOError), False otherwise"""
        return isinstance(exception, RetryTimeOutException)
    
    
    @retry_when_timeout(retries=3)
    def do_process():
        print("begin request...")
        sleep_time = random.randint(1, 4)
        print("request sleep time: %s." % sleep_time)
        time.sleep(sleep_time)
        print("end request...")
        return True
    
    
    if __name__ == "__main__":
        do_process()
    

      不支持 window.

  • 相关阅读:
    Java多线程、并发
    Java I/O系统
    Java注解
    Arthas(Java诊断工具)
    Java泛型
    Java内部类
    libpcap使用
    python文件服务器
    设计模式
    protobuf
  • 原文地址:https://www.cnblogs.com/dasheng-maritime/p/11604139.html
Copyright © 2011-2022 走看看