zoukankan      html  css  js  c++  java
  • Flask中使用apscheduler

    from flask import Flask,request
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.executors.pool import ProcessPoolExecutor
    from pymongo import MongoClient
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    import  uuid
    import  time
    from  Secdeer.mongoclient import Client as LocalMongoClient
    import logging
    logger = logging.getLogger(__name__)
    mdb = LocalMongoClient(username='admin123', password='admin123', host='172.30.2.21', port=27017,db="zero")
    client = MongoClient('172.30.2.21', 27017, username='admin123', password='admin123')
    jobstores = {
        'default': MongoDBJobStore(client=client, database='zero', collection='LoopTask'),
    }
    executors = {
        'default': ThreadPoolExecutor(max_workers=10)
    }
    def job_function():
        t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        print('job1 --- {}'.format(t))
    # 2.创建调度器
    scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
    scheduler.add_job(job_function, 'interval', seconds=3)
    scheduler.start()
    
    app = Flask(__name__)
    
    
    
    
    # 3.定义定时任务
    def print_test():
        print("print_test")
    
    # 动态启动定时任务
    @app.route('/start')
    def start():
        # 5.启动调度器,不阻塞
        scheduler.start()
        return '启动成功'
    
    @app.route("/add_str",methods=['POST'])
    def add_str():
        mdb.add("UserStrate", {"str_task_id_lsit": []})
        return "添加成功"
    
    # 动态添加定时任务
    @app.route("/add_job",methods=['POST'])
    def add_job():
        print(request.form)
        seconds = request.form.get("seconds")
        str_id = request.form.get("str_id")
        str_item= mdb.get_one_by_id("UserStrate",str_id)
        str_task_id_lsit = str_item.get("str_task_id_lsit")
        str_uuid = str(uuid.uuid4())
        str_task_id_lsit.append(str_uuid)
        mdb.update_one_by_id("UserStrate",str_id,{"str_task_id_lsit":str_task_id_lsit})
        print(request.method)
        print("add_job")
        print(seconds)
        #scheduler.add_job(print_test, 'interval', seconds=3, args=[], id=str(uuid.uuid4()))
        scheduler.add_job(print_test, 'date', run_date=seconds, id=str_uuid)
        return "add_job成功"
    
    # 动态暂停定时任务
    @app.route("/pause_job/<param>")
    def pause_job(param):
        scheduler.pause_job(param)
        return "动态暂停定时任务成功:{}".format(param)
    
    # 动态恢复定时任务
    @app.route("/resume_job/<param>")
    def resume_job(param):
        scheduler.resume_job(param)
        return "动态恢复定时任务成功:{}".format(param)
    
    # 动态删除定时任务
    @app.route("/remove_job/<param>")
    def remove_job(param):
        scheduler.remove_job(param)
        return "动态删除定时任务成功:{}".format(param)
    
    # 关闭所有定时任务
    @app.route("/del_job",methods=['POST'])
    def del_job():
        str_id = request.form.get("str_id")
        str_item = mdb.get_one_by_id("UserStrate", str_id)
        str_task_id_lsit = str_item.get("str_task_id_lsit")
        for str_task_id  in str_task_id_lsit:
            print(str_task_id)
            try:
                scheduler.remove_job(str_task_id)
                # str_task_id_lsit.remove(str_task_id)
            except Exception as e :
                print(e)
                continue
        return "关闭所有定时任务成功"
    
    @app.route("/test",)
    def test():
        print("test")
        return "test"
    if __name__ == '__main__':
        app.run()
    View Code

     测试:

    import  requests
    # for i in range(1000):
    #     requests.post(url="http://127.0.0.1:5000/add_job", data={
    #         "seconds" :"2020-12-12 00:00:00",
    #         "str_id":"5f608db58daef5d2e28c550b"
    #     })
    
    requests.post(url="http://127.0.0.1:5000/del_job", data={
        #"seconds" :"2020-12-12 00:00:00",
        "str_id":"5f608db58daef5d2e28c550b"
    })
    View Code
  • 相关阅读:
    BZOJ 1143 [CTSC2008]祭祀river
    BZOJ 3997 [TJOI2015]组合数学
    BZOJ 3996 [TJOI2015]线性代数
    BZOJ 4553 [Tjoi2016&Heoi2016]序列
    微信开发之密文模式 mcrypt_module_open 走不过
    JS JSON & ARRAY 遍历
    linux ftp服务器配置(Ubuntu)
    thinkphp 吐槽篇
    游戏--疯狂猜字随机混乱正确答案逻辑
    PHP 批量去除BOM头;此文转载;
  • 原文地址:https://www.cnblogs.com/weidaijie/p/13679088.html
Copyright © 2011-2022 走看看