zoukankan      html  css  js  c++  java
  • flask-apscheduler的使用

    Flask-APScheduler是根据APScheduler编写的一个flask模块,它提供了API管理任务。

    Advanced Python Scheduler(APScheduler)是一个Python库,可让Python代码稍后执行,一次或定期执行。

    安装

    pip install Flask-APScheduler
    

      

    配置  

    可以参考githup的范列

    test.py

    from flask import Flask
    from apscheduler.schedulers.background import BackgroundScheduler
    from flask_apscheduler import APScheduler
    
    '''flask_apscheduler的JOBS可以在Config中配置,
    也可以通过装饰器调用,还可以通过flask_apschedule的api进行添加
    job stores 默认为内存, 可以下在flask的Config中配置为存储在数据库中
    '''
    
    class Config(object):
        # JOBS可以在配置里面配置
        JOBS = [{
            'id': 'job1',
            'func': 'test:job1',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 10
        }]
        SCHEDULER_TIMEZONE = 'Asia/Shanghai'  # 配置时区
        SCHEDULER_API_ENABLED = True  # 添加API
        # 配置job store
        # DATABASE_URL="mysql+pymysql://scott:xxxx@192.168.0.95:3306/test?charset=utf8mb4"
        # SCHEDULER_JOBSTORES = {
        #     'default': SQLAlchemyJobStore(url=DATABASE_URL)
        # }
    
    # 初始化调度器
    scheduler = APScheduler(BackgroundScheduler(timezone="Asia/Shanghai"))
    
    
    def job1(a, b):
        print(str(a) + ' ' + str(b))
    
    
    # 使用装饰器调用
    @scheduler.task('interval', id='job_2', seconds=30, misfire_grace_time=900)
    def job2():
        print('Job 2 executed')
    
    
    if __name__ == '__main__':
        app = Flask(__name__)
        app.config.from_object(Config())
        # it is also possible to enable the API directly
        # scheduler.api_enabled = True
        scheduler.init_app(app)
        scheduler.start()
        app.run(host='0.0.0.0', port=5000)
    

      

    schedule的三种运行时间设置

    安排工作时,需要为其选择一个触发器触发器确定运行作业时通过其计算日期/时间的逻辑。APScheduler带有三种内置的触发器类型:

    • date:在您希望在特定时间仅运行一次作业时使用

    • interval:在您要以固定的时间间隔运行作业时使用

    • cron:当您要在一天的特定时间定期运行作业时使用

    scheduler.add_job(func=job2, args=('定时任务',), trigger='cron', second='*/5')
    scheduler.add_job(func=job2, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
    scheduler.add_job(func=job2, args=('循环任务',), trigger='interval', seconds=3)
    

      

    Flask-APSchedule提供的API

    scheduler.add_job                  POST                      /scheduler/jobs
    scheduler.delete_job               DELETE                    /scheduler/jobs/<job_id>
    scheduler.get_job                  GET                       /scheduler/jobs/<job_id>
    scheduler.get_jobs                 GET                       /scheduler/jobs
    scheduler.get_scheduler_info       GET                       /scheduler
    scheduler.pause_job                POST                      /scheduler/jobs/<job_id>/pause
    scheduler.resume_job               POST                      /scheduler/jobs/<job_id>/resume
    scheduler.run_job                  POST                      /scheduler/jobs/<job_id>/run
    scheduler.update_job               PATCH                     /scheduler/jobs/<job_id>
    

      

  • 相关阅读:
    jquery遍历节点 children(),next(),prev(),siblings()closest() 等一些常用方法...
    jq 分页
    java8 array、list操作 汇【4】)- Java8 Lambda表达式 函数式编程【思想】
    java8 array、list操作 汇【2】)- (Function,Consumer,Predicate,Supplier)应用
    Java8 (Function,Consumer,Predicate,Supplier)详解
    Elasticsearch
    org.apache.commons.lang3.Validate
    freemarker逻辑判断写法#if
    11 Sping框架--AOP的相关概念及其应用
    10 Spring框架--基于注解的IOC配置
  • 原文地址:https://www.cnblogs.com/zydev/p/13865535.html
Copyright © 2011-2022 走看看