from flask import Flask,request from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ProcessPoolExecutor from pymongo import MongoClient from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor import uuid import time from Secdeer.mongoclient import Client as LocalMongoClient import logging logger = logging.getLogger(__name__) mdb = LocalMongoClient(username='admin123', password='admin123', host='172.30.2.21', port=27017,db="zero") client = MongoClient('172.30.2.21', 27017, username='admin123', password='admin123') jobstores = { 'default': MongoDBJobStore(client=client, database='zero', collection='LoopTask'), } executors = { 'default': ThreadPoolExecutor(max_workers=10) } def job_function(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job1 --- {}'.format(t)) # 2.创建调度器 scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors) scheduler.add_job(job_function, 'interval', seconds=3) scheduler.start() app = Flask(__name__) # 3.定义定时任务 def print_test(): print("print_test") # 动态启动定时任务 @app.route('/start') def start(): # 5.启动调度器,不阻塞 scheduler.start() return '启动成功' @app.route("/add_str",methods=['POST']) def add_str(): mdb.add("UserStrate", {"str_task_id_lsit": []}) return "添加成功" # 动态添加定时任务 @app.route("/add_job",methods=['POST']) def add_job(): print(request.form) seconds = request.form.get("seconds") str_id = request.form.get("str_id") str_item= mdb.get_one_by_id("UserStrate",str_id) str_task_id_lsit = str_item.get("str_task_id_lsit") str_uuid = str(uuid.uuid4()) str_task_id_lsit.append(str_uuid) mdb.update_one_by_id("UserStrate",str_id,{"str_task_id_lsit":str_task_id_lsit}) print(request.method) print("add_job") print(seconds) #scheduler.add_job(print_test, 'interval', seconds=3, args=[], id=str(uuid.uuid4())) scheduler.add_job(print_test, 'date', run_date=seconds, id=str_uuid) return "add_job成功" # 动态暂停定时任务 @app.route("/pause_job/<param>") def pause_job(param): scheduler.pause_job(param) return "动态暂停定时任务成功:{}".format(param) # 动态恢复定时任务 @app.route("/resume_job/<param>") def resume_job(param): scheduler.resume_job(param) return "动态恢复定时任务成功:{}".format(param) # 动态删除定时任务 @app.route("/remove_job/<param>") def remove_job(param): scheduler.remove_job(param) return "动态删除定时任务成功:{}".format(param) # 关闭所有定时任务 @app.route("/del_job",methods=['POST']) def del_job(): str_id = request.form.get("str_id") str_item = mdb.get_one_by_id("UserStrate", str_id) str_task_id_lsit = str_item.get("str_task_id_lsit") for str_task_id in str_task_id_lsit: print(str_task_id) try: scheduler.remove_job(str_task_id) # str_task_id_lsit.remove(str_task_id) except Exception as e : print(e) continue return "关闭所有定时任务成功" @app.route("/test",) def test(): print("test") return "test" if __name__ == '__main__': app.run()
测试:
import requests # for i in range(1000): # requests.post(url="http://127.0.0.1:5000/add_job", data={ # "seconds" :"2020-12-12 00:00:00", # "str_id":"5f608db58daef5d2e28c550b" # }) requests.post(url="http://127.0.0.1:5000/del_job", data={ #"seconds" :"2020-12-12 00:00:00", "str_id":"5f608db58daef5d2e28c550b" })