zoukankan      html  css  js  c++  java
  • python使用apscheduler实现定时任务

    安装apscheduler

    pip install apscheduler
    

    基本概念

    1、触发器triggers。 触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。触发器有三种内建的trigger:

    1. data: 特定的时间触发
    2. interval: 固定的时间间隔触发
    3. cron: 在特定时间周期性地触发

    2、 任务存储器 job stores。 用于存放任务,把任务存放在内存(为默认MemoryJobStore)或数据库中

    3、执行器 executors。 执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件

    4、调度器scheduler。 把上方三个组件作为参数,通过创建调度器实例来运行

    根据开发需求选择对应的组件

    BlockingScheduler            阻塞式调度器:适用于只跑调度器的程序。
    BackgroundScheduler      后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
    AsyncIOScheduler            AsyncIO调度器,适用于应用使用AsnycIO的情况。
    GeventScheduler              Gevent调度器,适用于应用通过Gevent的情况。
    TornadoScheduler            Tornado调度器,适用于构建Tornado应用。
    TwistedScheduler             Twisted调度器,适用于构建Twisted应用。
    QtScheduler                      Qt调度器,适用于构建Qt应用。
    

    使用步骤

    1、新建一个调度器schedulers
    2、添加调度任务
    3、运行调度任务

    使用示例

    触发器date:特点的时间点触发,只执行一次

    from datetime import datetime
    from datetime import date
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    def job(text):    
        print(text)
    
    scheduler = BlockingScheduler()
    # 在 2019-8-30 运行一次 job 方法
    scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
    # 在 2019-8-30 01:00:00 运行一次 job 方法
    scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
    # 在 2019-8-30 01:00:01 运行一次 job 方法
    scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])
    
    scheduler.start()
    

    触发器interval:固定时间间隔触发。

    参数 说明
    weeks(int) 间隔几周
    days(int) 间隔几天
    hours(int) 间隔几小时
    minutes(int) 间隔几分钟
    seconds(int) 间隔多少秒
    start_date(datetime或str) 开始日期
    end_date(datetime或str) 结束日期
    timezone(datetime.tzinfo 或str)
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def run(text):    
        print(text)
    
    
    if __name__ == '__main__':
        scheduler = BlockingScheduler()
        # 每隔 1分钟 运行一次 job 方法
        scheduler.add_job(run, 'interval', minutes=1, args=['job1'])
        # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
        scheduler.add_job(run, 'interval', minutes=1, seconds=30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])
        scheduler.start()
    

    触发器cron: 在特定的时间周期性地触发。

    参数 说明
    year (int 或 str) 年,4位数字
    month (int 或 str) 月 (范围1-12)
    day (int 或 str) 日 (范围1-31)
    week (int 或 str) 周 (范围1-53)
    day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
    hour (int 或 str) 时 (范围0-23)
    minute (int 或 str) 分 (范围0-59)
    second (int 或 str) 秒 (范围0-59)
    start_date (datetime 或 str) 最早开始日期(包含)
    end_date (datetime 或 str) 最晚结束时间(包含)
    timezone (datetime.tzinfo 或str) 指定时区
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def exec_job(text):
        print(text)
        
    
    if __name__ == '__main__':
        scheduler = BlockingScheduler()
        # 在每天的22点,每隔一分钟 运行一次exec_job方法
        scheduler.add_job(exec_job, 'cron', hour=22, minute='*/1', args=['job_one'])
        # 在每天的22点和23点的25分, 运行一次exec_job方法
        scheduler.add_job(exec_job, 'cron', hour='22-23', minute='25', args=['result'])
        scheduler.start()
    
    

    添加任务

    添加任务的方法有两种
    (1)、通过add_job()添加任务
    (2)、通过装饰器scheduled_job()

    from datetime import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    scheduler = BlockingScheduler()
    
    
    def tick(message='working'):
        print('Tick! The time is: %s' % datetime.now())
        print("worker status is:", message)
    
    
    def tick_other(status="relaxing"):
        print('Tick! The time is: %s' % datetime.now())
        print("worker status is:", status)
    
    
    # 添加任务的方法有两种 1、通过调用add_job(), 2、通过装饰器scheduled_job()
    # add_job()方法返回一个apscheduler.job.Job实例,可以使用该实例稍后修改或删除该任务
    @scheduler.scheduled_job('interval', seconds=5)
    def tick_another(status="restart"):
        print('Tick! The time is: %s' % datetime.now())
        print("worker status is:", status)
    
    
    if __name__ == '__main__':
        # 在指定期间内, 每隔1分10秒运行一次tick方法
        scheduler.add_job(tick, 'interval', minutes=1, seconds=10,
                          start_date='2020-01-06 14:19:00', end_date='2020-01-06 14:24:00', args=['sleep'])
        # 在每天的11点25分,运行一次tick_other方法
        scheduler.add_job(tick_other, 'cron', hour='11', minute='25', args=['hard working'])
        scheduler.start()
    
    

    参考来源

    在线文档:在线文档
    转载来源:文档

  • 相关阅读:
    如何一键部署项目&&代码自动更新
    Node服务端极速搭建 - nvmhome
    Node服务端极速搭建 -- nvmhome
    自动生成了一本ES6的书
    在linux中给你的应用做压力测试
    .NET 跨平台服务端资料
    CabArc to create or extract a cab file
    (转)什么时候要抛出异常?
    Sprint评审会议不是Sprint演示会议
    Sprint回顾大揭秘——“宝典”来了
  • 原文地址:https://www.cnblogs.com/ppwang06/p/apscheduler.html
Copyright © 2011-2022 走看看