zoukankan      html  css  js  c++  java
  • flask-apscheduler的使用

    Flask-APScheduler是根据APScheduler编写的一个flask模块,它提供了API管理任务。

    Advanced Python Scheduler(APScheduler)是一个Python库,可让Python代码稍后执行,一次或定期执行。

    安装

    pip install Flask-APScheduler
    

      

    配置  

    可以参考githup的范列

    test.py

    from flask import Flask
    from apscheduler.schedulers.background import BackgroundScheduler
    from flask_apscheduler import APScheduler
    
    '''flask_apscheduler的JOBS可以在Config中配置,
    也可以通过装饰器调用,还可以通过flask_apschedule的api进行添加
    job stores 默认为内存, 可以下在flask的Config中配置为存储在数据库中
    '''
    
    class Config(object):
        # JOBS可以在配置里面配置
        JOBS = [{
            'id': 'job1',
            'func': 'test:job1',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 10
        }]
        SCHEDULER_TIMEZONE = 'Asia/Shanghai'  # 配置时区
        SCHEDULER_API_ENABLED = True  # 添加API
        # 配置job store
        # DATABASE_URL="mysql+pymysql://scott:xxxx@192.168.0.95:3306/test?charset=utf8mb4"
        # SCHEDULER_JOBSTORES = {
        #     'default': SQLAlchemyJobStore(url=DATABASE_URL)
        # }
    
    # 初始化调度器
    scheduler = APScheduler(BackgroundScheduler(timezone="Asia/Shanghai"))
    
    
    def job1(a, b):
        print(str(a) + ' ' + str(b))
    
    
    # 使用装饰器调用
    @scheduler.task('interval', id='job_2', seconds=30, misfire_grace_time=900)
    def job2():
        print('Job 2 executed')
    
    
    if __name__ == '__main__':
        app = Flask(__name__)
        app.config.from_object(Config())
        # it is also possible to enable the API directly
        # scheduler.api_enabled = True
        scheduler.init_app(app)
        scheduler.start()
        app.run(host='0.0.0.0', port=5000)
    

      

    schedule的三种运行时间设置

    安排工作时,需要为其选择一个触发器触发器确定运行作业时通过其计算日期/时间的逻辑。APScheduler带有三种内置的触发器类型:

    • date:在您希望在特定时间仅运行一次作业时使用

    • interval:在您要以固定的时间间隔运行作业时使用

    • cron:当您要在一天的特定时间定期运行作业时使用

    scheduler.add_job(func=job2, args=('定时任务',), trigger='cron', second='*/5')
    scheduler.add_job(func=job2, args=('一次性任务',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
    scheduler.add_job(func=job2, args=('循环任务',), trigger='interval', seconds=3)
    

      

    Flask-APSchedule提供的API

    scheduler.add_job                  POST                      /scheduler/jobs
    scheduler.delete_job               DELETE                    /scheduler/jobs/<job_id>
    scheduler.get_job                  GET                       /scheduler/jobs/<job_id>
    scheduler.get_jobs                 GET                       /scheduler/jobs
    scheduler.get_scheduler_info       GET                       /scheduler
    scheduler.pause_job                POST                      /scheduler/jobs/<job_id>/pause
    scheduler.resume_job               POST                      /scheduler/jobs/<job_id>/resume
    scheduler.run_job                  POST                      /scheduler/jobs/<job_id>/run
    scheduler.update_job               PATCH                     /scheduler/jobs/<job_id>
    

      

  • 相关阅读:
    LeetCode Binary Tree Inorder Traversal
    LeetCode Populating Next Right Pointers in Each Node
    LeetCode Construct Binary Tree from Inorder and Postorder Traversal
    LeetCode Reverse Linked List II
    LeetCode Populating Next Right Pointers in Each Node II
    LeetCode Pascal's Triangle
    Palindrome Construct Binary Tree from Preorder and Inorder Traversal
    Pascal's Triangle II
    LeetCode Word Ladder
    LeetCode Binary Tree Zigzag Level Order Traversal
  • 原文地址:https://www.cnblogs.com/zydev/p/13865535.html
Copyright © 2011-2022 走看看