zoukankan      html  css  js  c++  java
  • python:定时任务模块schedule

    1.安装

    pip install schedule

    2.文档

    https://schedule.readthedocs.io/en/stable/faq.html#how-to-execute-jobs-in-parallel

    3.官网使用demo

    import schedule
    import time
    
    def job():
        print("I'm working...")
    
    schedule.every(10).minutes.do(job)
    schedule.every().hour.do(job)
    schedule.every().day.at("10:30").do(job)
    schedule.every(5).to(10).minutes.do(job)
    schedule.every().monday.do(job)
    schedule.every().wednesday.at("13:15").do(job)
    schedule.every().minute.at(":17").do(job)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

    4.我的schedule使用demo

    import sys
    import time
    import schedule
    import os
    import logging
    if not os.path.exists('/var/log/video_download/'):
        os.makedirs('/var/log/video_download')
    log = logging.getLogger()
    log.setLevel(logging.DEBUG)
    fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s",
                            "%Y-%m-%d %H:%M:%S")
    stream_handler = logging.FileHandler(
        '/var/log/video_download/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time()))))
    stream_handler.setLevel(logging.DEBUG)
    stream_handler.setFormatter(fmt)
    log.addHandler(stream_handler)
    def handler():
        print("this is handler")
    def main():
        if len(sys.argv) != 2:
            print('python video_data.py hour')
            sys.exit()
        param = sys.argv[1]
        if param == 'hour':
            log.debug("enter main")
            schedule.every().day.at("00:00").do(handler)
            schedule.every().hour.do(handler)
            while True:
                schedule.run_pending()
                time.sleep(1)
        else:
            print("python video_data.py hour")
            sys.exit()
    
    
    if __name__ == "__main__":
        main()

    5.拓展:

      并行执行任务

      (1)默认情况下,schedule按顺序执行所有作业。这背后的原因是很难找到一个让每个人都开心的并行执行模型

    import threading
    import time
    import schedule
    
    
    def job():
        print("I'm running on thread %s" % threading.current_thread())
    
    
    def run_threaded(job_func):
        job_thread = threading.Thread(target=job_func)
        job_thread.start()
    
    
    schedule.every(10).seconds.do(run_threaded, job)
    schedule.every(10).seconds.do(run_threaded, job)
    schedule.every(10).seconds.do(run_threaded, job)
    schedule.every(10).seconds.do(run_threaded, job)
    schedule.every(10).seconds.do(run_threaded, job)
    
    
    while 1:
        schedule.run_pending()
        time.sleep(1)

      (2)如果需要控制线程数,就需要用queue

    import Queue
    import time
    import threading
    import schedule
    
    
    def job():
        print("I'm working")
    
    
    def worker_main():
        while 1:
            job_func = jobqueue.get()
            job_func()
            jobqueue.task_done()
    
    jobqueue = Queue.Queue()
    
    schedule.every(10).seconds.do(jobqueue.put, job)
    schedule.every(10).seconds.do(jobqueue.put, job)
    schedule.every(10).seconds.do(jobqueue.put, job)
    schedule.every(10).seconds.do(jobqueue.put, job)
    schedule.every(10).seconds.do(jobqueue.put, job)
    
    worker_thread = threading.Thread(target=worker_main)
    worker_thread.start()
    
    while 1:
        schedule.run_pending()
        time.sleep(1)

      (3)抛出异常

    import functools
    
    def catch_exceptions(cancel_on_failure=False):
        def catch_exceptions_decorator(job_func):
            @functools.wraps(job_func)
            def wrapper(*args, **kwargs):
                try:
                    return job_func(*args, **kwargs)
                except:
                    import traceback
                    print(traceback.format_exc())
                    if cancel_on_failure:
                        return schedule.CancelJob
            return wrapper
        return catch_exceptions_decorator
    
    @catch_exceptions(cancel_on_failure=True)
    def bad_task():
        return 1 / 0
    
    schedule.every(5).minutes.do(bad_task)

      (4)只运行一次

    def job_that_executes_once():
        # Do some work ...
        return schedule.CancelJob
    
    schedule.every().day.at('22:30').do(job_that_executes_once)

      (5)一次取消多个任务

    def greet(name):
        print('Hello {}'.format(name))
    
    schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
    schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
    schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
    schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
    
    schedule.clear('daily-tasks')

      (6)在任务中加入日志功能

    import functools
    import time
    
    import schedule
    
    
    # This decorator can be applied to
    def with_logging(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            print('LOG: Running job "%s"' % func.__name__)
            result = func(*args, **kwargs)
            print('LOG: Job "%s" completed' % func.__name__)
            return result
        return wrapper
    
    @with_logging
    def job():
        print('Hello, World.')
    
    schedule.every(3).seconds.do(job)
    
    while 1:
        schedule.run_pending()
        time.sleep(1)

      (7)随机开展工作

    def my_job():
        # This job will execute every 5 to 10 seconds.
        print('Foo')
    
    schedule.every(5).to(10).seconds.do(my_job)

      (8)传参给作业函数

    def greet(name):
        print('Hello', name)
    
    schedule.every(2).seconds.do(greet, name='Alice')
    schedule.every(4).seconds.do(greet, name='Bob')

  • 相关阅读:
    sublime text 配置本地静态服务器方法
    js中获得当前时间是年份和月份
    如何在Intellij IDEA中拉svn分支?
    快速上手seajs模块化以及案例
    webpack 配置多页面应用的一次尝试
    【Gitlab】gitlab-CI 持续集成以及runner的配置简版
    【vue】elementUI报错:_self.$scopedSlots.default is not a function
    【webpack】webpack多版本控制方案
    vuepress博客主题—vuepress-theme-reco
    reco-fetch
  • 原文地址:https://www.cnblogs.com/kakawith/p/10432526.html
Copyright © 2011-2022 走看看