zoukankan      html  css  js  c++  java
  • flask_apscheduler定时任务组件使用

    Flask-APScheduler 是Flask框架的一个扩展库,增加了Flask对apScheduler的支持,可以用作特定于平台的调度程序(如cron守护程序或Windows任务调度程序)的跨平台。

    APScheduler有三个可以使用的内置调度系统:

    • Cron式调度(可选的开始/结束时间)
    • 基于区间的执行(偶数间隔运行作业,可选的开始/结束时间)
    • 一次性延迟执行(在设定的日期/时间运行一次作业)

    APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:

    • triggers: 任务触发器组件,提供任务触发方式
    • job stores: 任务商店组件,提供任务保存方式
    • executors: 任务调度组件,提供任务调度方式
    • schedulers: 任务调度组件,提供任务工作方式

    支持多种存储空间

    • RAM
    • 基于文件的简单数据库
    • SQLAlchem
    • MongoDB
    • Redis

    简单使用

    appblueprints/task.py

    import uuid
    import utils
    from flask_apscheduler import APScheduler
    from flask import Blueprint, jsonify, request
    
    
    
    Taskapi = Blueprint("task", __name__, url_prefix="/task")
    
    Scheduler = None
    taskdict = {}
    
    def init():
        global Scheduler
        Scheduler = APScheduler()
        return Scheduler
    
    
    # 暂停任务
    # http://127.0.0.1:5000/task/pause?id=2
    @Taskapi.route('/pause', methods=['GET'])
    def pause_job():
        job_id = request.args.get('id')
        Scheduler.pause_job(str(job_id))
        response = {}
        response["msg"] = "success"
        return jsonify(response)
    
    
    # 恢复任务
    # http://127.0.0.1:5000/task/resume?id=2
    @Taskapi.route('/resume', methods=['GET'])
    def resume_job():
        job_id = request.args.get('id')
        Scheduler.resume_job(str(job_id))
        response = {}
        response["msg"] = "success"
        return jsonify(response)
    
    
    # 获取任务
    # http://127.0.0.1:5000/task/getjobs
    @Taskapi.route('/getjobs', methods=['GET'])
    def get_task():
        # jobs = Scheduler.get_jobs()
        # print(str(pickle.dumps(jobs)))
        return jsonify(taskdict)
    
    
    # 移除任务
    @Taskapi.route('/removejob', methods=['GET'])
    def remove_job():
        job_id = request.args.get('id')
        Scheduler.remove_job(str(job_id))
        response = {}
        response["msg"] = "success"
        return jsonify(response)
    
    
    
    # 添加任务
    # http://cab912dac880.ngrok.io/task/addjob?tasktype=interval&minute=10&psm=caijing.charge.union_service&tag=prod&env=product&chat_id=6911623998451269634
    @Taskapi.route('/addjob', methods=['GET'])
    def add_task():
        global taskdict
        psm = request.args.get('psm', "cmp.ecom.settle")
        tag = request.args.get('tag', "prod")
        env = request.args.get('env', "boe")
        chat_id = request.args.get('chat_id', "6911623998451269634")
        tasktype = request.args.get('tasktype', "interval")
        minute = request.args.get('minute', "10")
        minute = float(minute)
        response = {}
        response["msg"] = "success"
        seconds = minute * 60
        # trigger='cron' 表示是一个定时任务
        # if tasktype == 'corn':
        #     id = str(uuid.uuid4())
        #     response["taskid"] = id
        #     Scheduler.add_job(func=test, id='1', args=(1, 1), trigger='cron', day_of_week='0-6', hour=18, minute=24,
        #                       second=10, replace_existing=True)
        # trigger='interval' 表示是一个循环任务,每隔多久执行一次
        if tasktype == "interval":
            id = str(uuid.uuid4())
            response["taskid"] = id
            response["data"] = [psm,env,tag,str(seconds)+""]
            taskdict[id] = response["data"]
            Scheduler.add_job(func=utils.start, id=id, args=(psm,env,tag,seconds,id,minute,chat_id), trigger='interval', seconds=seconds,
                              replace_existing=True)
        else:
            response["id"] = ""
            response["msg"] = "tasktype 类型不存在"
    
        return jsonify(response)

    main.py

    from flask import Flask
    from appblueprints import task
    
    # 创建app
    app = Flask(__name__)# 注册蓝图
    app.register_blueprint(task.Taskapi)
    
    if __name__ == '__main__':
        scheduler = task.init()
        scheduler.init_app(app=app)
        scheduler.start()
        app.run(host="0.0.0.0", port=6000)

    triggers: 支持三种任务触发方式

    • date:固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建

      | 参数 | 说明 |
      | :——————————– | :——————- |
      | run_date (datetime 或 str) | 作业的运行日期或时间 |
      | timezone (datetime.tzinfo 或 str) | 指定时区 |

      1
      2
      例如# 在 2019-4-24 00:00:01 时刻运行一次 start_system 方法
      scheduler .add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])
    • interval:时间间隔触发器,每个一定时间间隔执行一次。

      | 参数 | 说明 |
      | —————————- | ———- |
      | weeks (int) | 间隔几周 |
      | days (int) | 间隔几天 |
      | hours (int) | 间隔几小时 |
      | minutes (int) | 间隔几分钟 |
      | seconds (int) | 间隔多少秒 |
      | start_date (datetime 或 str) | 开始日期 |
      | end_date (datetime 或 str) | 结束日期 |

      1
      2
      # 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之间, 每隔两小时执行一次 alarm_job 方法
      scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')
    • cron:cron风格的任务触发
    参数说明
    year (int 或 str) 表示四位数的年份 (2019)
    month(int str) 月 (范围1-12)
    day(int str) 日 (范围1-31)
    week(int str) 周 (范围1-53)
    day_of_week (int str) 表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
    hour (int str) 表示取值范围为0-23时
    minute (int str) 表示取值范围为0-59分
    second (int str) 表示取值范围为0-59秒
    start_date (datetime str) 表示开始时间
    end_date (datetime str) 表示结束时间
    timezone (datetime.tzinfo str) 表示时区取值

    (int|str) 表示参数既可以是int类型,也可以是str类型
    (datetime | str) 表示参数既可以是datetime类型,也可以是str类型

    例如:表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5

    1
    sched.add_job(my_job, 'cron',second = '*/5')

    job stores: 支持四种任务存储方式

    • memory:默认配置任务存在内存中
    • mongdb:支持文档数据库存储
    • sqlalchemy:支持关系数据库存储
    • redis:支持键值对数据库存储

    schedulers: 调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

    • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
    • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
    • AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
    • GeventScheduler: 和 gevent 框架配套使用
    • TornadoScheduler: 和 tornado 框架配套使用
    • TwistedScheduler: 和 Twisted 框架配套使用
    • QtScheduler: 开发 qt 应用的时候使用
  • 相关阅读:
    梯度方向问题
    switchsharp
    R语言学习笔记:sort、rank、order、arrange排序函数
    R语言学习笔记:choose、factorial、combn排列组合函数
    MySQL学习笔记:少用Null
    Oracle学习笔记:11g服务介绍及哪些服务必须开启?
    GreenPlum学习笔记:create or replace function创建函数
    Python学习笔记:出生日期转化为年龄
    Python学习笔记:import sys模块(argv、path、platform、exit)
    Oracle学习笔记:wm_concat函数合并字段
  • 原文地址:https://www.cnblogs.com/-wenli/p/14737476.html
Copyright © 2011-2022 走看看