zoukankan      html  css  js  c++  java
  • 定时任务APScheduler使用详解

    cron定时任务

    # coding=utf-8
    """
    Demonstrates how to use the background scheduler to schedule a job that executes on 3 second
    intervals.
    """
    
    from datetime import datetime
    import os
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def tick():
        print('Tick! The time is: %s' % datetime.now())
    
    
    if __name__ == '__main__':
        scheduler = BlockingScheduler()
        scheduler.add_job(tick, 'cron', day_of_week='6', second='*/5')
        '''
            year (int|str) – 4-digit year
            month (int|str) – month (1-12)
            day (int|str) – day of the (1-31)
            week (int|str) – ISO week (1-53)
            day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
            hour (int|str) – hour (0-23)
            minute (int|str) – minute (0-59)
            second (int|str) – second (0-59)
    
            start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
            end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
            timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
    
            *    any    Fire on every value
            */a    any    Fire every a values, starting from the minimum
            a-b    any    Fire on any value within the a-b range (a must be smaller than b)
            a-b/c    any    Fire every c values within the a-b range
            xth y    day    Fire on the x -th occurrence of weekday y within the month
            last x    day    Fire on the last occurrence of weekday x within the month
            last    day    Fire on the last day within the month
            x,y,z    any    Fire on any matching expression; can combine any number of any of the above expressions
        '''
    
        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    
        try:
            scheduler.start()    #采用的是阻塞的方式,只有一个线程专职做调度的任务
        except (KeyboardInterrupt, SystemExit):
            # Not strictly necessary if daemonic mode is enabled but should be done if possible
            scheduler.shutdown()
            print('Exit The Job!')
    

    常用调度设置

    每天9点50执行 sched.add_job(任务函数, 'cron', hour=9, minute=50)
    每周一到五9点50执行 sched.add_job(self.job, 'cron', day_of_week='1-5', hour=9, minute=50)

    interval间隔执行

    # coding=utf-8
    """
    Demonstrates how to use the background scheduler to schedule a job that executes on 3 second
    intervals.
    """
    
    from datetime import datetime
    import time
    import os
    
    from apscheduler.schedulers.background import BackgroundScheduler
    
    
    def tick():
        print('Tick! The time is: %s' % datetime.now())
    
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        scheduler.add_job(tick, 'interval', seconds=3)  #间隔3秒钟执行一次
        scheduler.start()    #这里的调度任务是独立的一个线程
        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    
        try:
            # This is here to simulate application activity (which keeps the main thread alive).
            while True:
                time.sleep(2)    #其他任务是独立的线程执行
                #scheduler.start()    #如果使用阻塞方式调度,就把scheduler.start()放在这里
                print('sleep!')
    
        except (KeyboardInterrupt, SystemExit):
            # Not strictly necessary if daemonic mode is enabled but should be done if possible
            scheduler.shutdown()
            print('Exit The Job!')
    

    date指定时间执行

    # coding=utf-8
    """
    Demonstrates how to use the background scheduler to schedule a job that executes on 3 second
    intervals.
    """
    
    from datetime import datetime
    import time
    import os
    
    from apscheduler.schedulers.background import BackgroundScheduler
    
    
    def tick():
        print('Tick! The time is: %s' % datetime.now())
    
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        #scheduler.add_job(tick, 'interval', seconds=3)
        scheduler.add_job(tick, 'date', run_date='2016-02-14 15:01:05')  #在指定的时间,只执行一次
        scheduler.start()    #这里的调度任务是独立的一个线程
        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    
        try:
            # This is here to simulate application activity (which keeps the main thread alive).
            while True:
                time.sleep(2)    #其他任务是独立的线程执行
                print('sleep!')
        except (KeyboardInterrupt, SystemExit):
            # Not strictly necessary if daemonic mode is enabled but should be done if possible
            scheduler.shutdown()
            print('Exit The Job!')
    

    参考文档

    --------------------------------------------------------- 恐惧是因为努力的还不够,加油 ~~---------------------------------------------
  • 相关阅读:
    知识点复习
    【程序人生】一个IT人的立功,立言,立德三不朽
    【朝花夕拾】Android多线程之(三)runOnUiThread篇——程序猿们的贴心小棉袄
    【朝花夕拾】Android多线程之(二)ThreadLocal篇
    【朝花夕拾】Android多线程之(一)View.post()篇
    Camera2笔记
    HangFire多集群切换及DashBoard登录验证
    Linq 动态多条件group by
    Api接口签名验证
    postgre ||连接字段
  • 原文地址:https://www.cnblogs.com/zhaobowen/p/13579892.html
Copyright © 2011-2022 走看看