zoukankan      html  css  js  c++  java
  • 基于json文件实现的gearman任务自动重启

    一:在gearman任务失败后,调用task_failed

    def task_failed(task, *args):
        info = '
    '.join(args)
        datetime = local_datetime()
        text = '{} FAILED:
    {}
    当前响应worker已关闭
    {}
    -->【{}】'.format(task, info, datetime, task)
        print(text)
        check_frequency(task)

    二:打印失败信息后,调用check_frequency检查任务5分钟内的重启次数

    def check_frequency(task):
        instance = TaskReloadMonitor()
        now = time.time()
        task_info = instance.open(task.lower())
        if not task_info:
            return
        worker = task_info.get('worker')
        last_time = task_info.get('last_time')
        if not last_time:
            task_info['timer'] = 1
            task_info['last_time'] = now
            instance.write()
            task_reload(task, worker, task_info['timer'])
            return
        if int(now) - int(last_time) > 300:
            task_info['timer'] = 1
            task_info['last_time'] = now
            instance.write()
            task_reload(task, worker, task_info['timer'])
            return
        timer = task_info.get('timer')
        if not (timer + 1 > 3):
            task_info['timer'] = timer + 1
            task_info['last_time'] = now
            instance.write()
            task_reload(task, worker, task_info['timer'])

    三:确认重启任务后,利用subprocess重启任务,subprocess.Popen方法可以非阻塞运行cmd命令

    def task_reload(task, worker, timer):
        from coursepoints.settings import BASE_DIR
        manage = os.path.join(BASE_DIR, 'manage.py')
        datetime = local_datetime()
        command = 'python {} {}'.format(manage, worker)
        subprocess.Popen(command, shell=True)
        text = '-->task reload:{}
    -->timer:{}
    -->{}'.format(task, timer, datetime)
        print(text)
    class TaskReloadMonitor():
        def __init__(self):
            pass
    
        @property
        def path(self):
            path = Path(__file__).parent.joinpath('task.json')
            return path
    
        def open(self, task):
            try:
                f = open(self.path, 'r', encoding='utf8')
                data = json.loads(f.read())
                f.close()
                self.task_data = data
                task_info = data.get(task)
                return task_info
            except Exception as e:
                print(e)
                return None
    
        def write(self):
            try:
                f = open(self.path, 'w', encoding='utf8')
                data = json.dumps(self.task_data)
                f.write(data)
                f.close()
            except Exception as e:
                print(e)
    json文件读写
    {
      "pptconvert": {
        "worker": "pptconvert",
        "timer": 1,
        "last_time": 1555356612.9220166
      },
      "screencapture": {
        "worker": "screencapture",
        "timer": 0
      },
      "snapscreen": {
        "worker": "snapscreen",
        "timer": 1,
        "last_time": 1555441223.166838
      }
    }
    json文件内容
  • 相关阅读:
    Authentication for the REST APIs
    Authentication for the REST APIs
    泛型转Datatable
    Web API 返回json文件的2中不用方式
    Robotframework自定义关键字库
    python通过接口上传图片造测试数据
    robot framework(2) 环境搭建
    RobotFrameWork(1) 关键字驱动测试框架
    python发送带附件的邮件
    解决adb连接海马玩模拟器
  • 原文地址:https://www.cnblogs.com/li1992/p/10723927.html
Copyright © 2011-2022 走看看