zoukankan      html  css  js  c++  java
  • FastAPI+apSheduler动态定时任务


    阅读目录

    一、apSheduler

    二、实战应用

    apSheduler

    1.安装

    pip install apscheduler

    2.基础组件

    • triggers 触发器
    • job stores job存储
    • executors 执行器
    • schedulers 调度器

    3.选择合适的调度器,存储器,执行器,触发器

    3.1调度器(schedulers)

    • BlockingScheduler: 进程中只有调度器
    • BackgroundScheduler: 非以下框架,且希望运行在后台
    • AsyncIOScheduler: 应用程序使用asyncio
    • GeventScheduler: 应用程序使用gevent
    • TornadoScheduler: 构建Tornado
    • TwistedScheduler: 构建Twisted应用
    • QtScheduler: 构建Qt应用

    3.2存储器(job stores)

    • 持久化存储job,通过SQLAlchemyJobStore设置存储链接
    • 非持久化存储job,重启时重新创建job,默认MemoryJobStore内存存储

    3.3执行器(executors)

    • processpoolexecutor,CUP密集型业务,可选进程池,也可以同线程池同时使用
    • threadpoolexecutor,默人线程池

    3.4触发器(triggers )

    • date: 设置日期,针对某个时间点运行一次job
    • interval: 固定时间间隔运行job
    • cron: 类似linux-crontab,某个时间点定期运行job

    4.配置调度器

    from pytz import utc
    
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ProcessPoolExecutor
    
    
    jobstores = {
        # 可以配置多个存储
        'mongo': {'type': 'mongodb'}, 
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
    }
    executors = {
        'default': {'type': 'threadpool', 'max_workers': 20},     # 最大工作线程数20
        'processpool': ProcessPoolExecutor(max_workers=5)         # 最大工作进程数为5
    }
    job_defaults = {
        'coalesce': False,   # 关闭新job的合并,当job延误或者异常原因未执行时
        'max_instances': 3   # 并发运行新job默认最大实例多少
    }
    scheduler = BackgroundScheduler()
    
    # .. do something else here, maybe add jobs etc.
    
    scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区
    

    5.调度器的增删改查

    import os
    import time
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def print_time(name):
        print(f'{name} - {time.ctime()}')
    
    def add_job(job_id, func, args, seconds):
        """添加job"""
        print(f"添加job - {job_id}")
        scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)
    
    def remove_job(job_id):
        """移除job"""
        scheduler.remove_job(job_id)
        print(f"移除job - {job_id}")
    
    def pause_job(job_id):
        """停止job"""
        scheduler.pause_job(job_id)
        print(f"停止job - {job_id}")
    
    def resume_job(job_id):
        """恢复job"""
        scheduler.resume_job(job_id)
        print(f"恢复job - {job_id}")
    
    def get_jobs():
        """获取所有job信息,包括已停止的"""
        res = scheduler.get_jobs()
        print(f"所有job - {res}")
    
    def print_jobs():
        print(f"详细job信息")
        scheduler.print_jobs()
    
    def start():
        """启动调度器"""
        scheduler.start()
    
    def shutdown():
        """关闭调度器"""
        scheduler.shutdown()
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        start()
        print('Press Ctrl+{0} to exit 
    '.format('Break' if os.name == 'nt' else 'C'))
        add_job('job_A', func=print_time, args=("A", ), seconds=1)
        add_job('job_B', func=print_time, args=("B", ), seconds=2)
        time.sleep(6)
        pause_job('job_A')
        get_jobs()
        time.sleep(6)
        print_jobs()
        resume_job('job_A')
        time.sleep(6)
        remove_job('job_A')
        time.sleep(6)
        try:
            shutdown()
        except RuntimeError:
            pass
    

    6.调度事件

    可以将事件侦听器附加到调度程序。调度程序事件在某些情况下会被触发,并且可能会在其中携带有关该特定事件详细信息的附加信息。通过给add_listener()提供适当的掩码参数或者将不同的常量放在一起,可以只监听特定类型的事件。用一个参数调用侦听器callable,即event对象。

    def my_listener(event):
        if event.exception:
            print('The job crashed :(')
        else:
            print('The job worked :)')
    
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    

    7.配置日志

    import logging
    
    logging.basicConfig()
    logging.getLogger('apscheduler').setLevel(logging.DEBUG)
    

    实战应用

    1.fastapi动态添加定时任务

    import asyncio
    import datetime
    import uvicorn
    from fastapi import FastAPI, Body
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ProcessPoolExecutor
    from apscheduler.triggers.cron import CronTrigger
    
    app = FastAPI(title='fast-api')
    
    scheduler = None
    
    
    @app.on_event('startup')
    def init_scheduler():
        """初始化"""
        jobstores = {
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
        }
        executors = {
            'default': {'type': 'threadpool', 'max_workers': 20},  # 最大工作线程数20
            'processpool': ProcessPoolExecutor(max_workers=5)  # 最大工作进程数为5
        }
        global scheduler
        scheduler = AsyncIOScheduler()
        scheduler.configure(jobstores=jobstores, executors=executors)
        # 添加一个coroutine执行,结果很不理想...
        scheduler.add_job(tick, 'interval', seconds=3)
        print("启动调度器...")
    
        scheduler.start()
    
    
    def print_time(name):
        print(f'{name} - {datetime.datetime.now()}')
    
    
    async def tick():
        print('Tick! The time is: %s' % datetime.datetime.now())
        await asyncio.sleep(1)
    
    
    @app.post('/add-job')
    async def add_job(job_id: str = Body(...), cron: str = Body(...)):
        """添加job"""
        scheduler.add_job(id=job_id, func=print_time, args=(job_id, ), trigger=CronTrigger.from_crontab(cron))
        return {"msg": "success!"}
    
    
    @app.post('/remove-job')
    async def remove_job(job_id: str = Body(..., embed=True)):
        """移除job"""
        scheduler.remove_job(job_id)
        return {"msg": "success!"}
    
    
    if __name__ == '__main__':
        uvicorn.run(app, host='127.0.0.1', port=5566)
    

    1.1问题

    1. AsyncIOScheduler()添加协程job会有问题
    2. apscheduler在多进程中会多次加载job,导致job重复执行,怎么解决呢?
      • 官方推荐可以用http、rpyc、grpc等方式解决

    2.搭建rpyc服务添加定时任务

    2.1定时业务代码在rpyc服务中

    2.2通过rpyc服务回调添加定时任务

    server.py
    import datetime
    import uvicorn
    from fastapi import FastAPI, Body
    import rpyc
    
    app = FastAPI(title='fast-api')
    
    conn = None
    bgsrv = None
    mon = None
    
    @app.on_event('startup')
    def init_scheduler():
        """初始化"""
        global conn,bgsrv,mon
        conn = rpyc.connect("localhost", 12345)
        # create a bg thread to process incoming events
        bgsrv = rpyc.BgServingThread(conn)  # 执行回调函数必须在开始线程
        mon = conn.root.Monitor(print_time)
    
    def print_time(name):
        print(f'{name} - {datetime.datetime.now()}')
    
    def from_crontab(cron):
        values = cron.split(' ')
        return {
            'minute': values[0],
            'hour': values[1],
            'day': values[2],
            'month': values[3],
            'day_of_week': values[4],
        }
    
    
    @app.post('/add-job')
    async def add_job(job_id: str = Body(...), cron: str = Body(...)):
        """添加job"""
        mon.add_job(id=job_id, args=(job_id, ), trigger='cron',  **from_crontab(cron))
        return {"msg": "success!"}
    
    @app.post('/remove-job')
    async def remove_job(job_id: str = Body(..., embed=True)):
        """移除job"""
        mon.remove_job(job_id)
        return {"msg": "success!"}
    
    if __name__ == '__main__':
        uvicorn.run(app, host='127.0.0.1', port=5566)
    
    rpc.py
    import rpyc
    from rpyc.utils.server import ThreadedServer
    from apscheduler.schedulers.background import BackgroundScheduler
    
    class SchedulerService(rpyc.Service):
    
        class exposed_Monitor(object):   # exposing names is not limited to methods :)
            def __init__(self, callback):
                # 这里需要用rpyc.async_异步加载回调函数
                self.callback = rpyc.async_(callback)
    
            def exposed_add_job(self, *args, **kwargs):
                print("添加任务:", args, kwargs)
                return scheduler.add_job(self.callback, *args, **kwargs)
    
            def exposed_pause_job(self, job_id, jobstore=None):
                return scheduler.pause_job(job_id, jobstore)
    
            def exposed_resume_job(self, job_id, jobstore=None):
                return scheduler.resume_job(job_id, jobstore)
    
            def exposed_remove_job(self, job_id, jobstore=None):
                scheduler.remove_job(job_id, jobstore)
    
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        scheduler.start()
        protocol_config = {'allow_public_attrs': True}
        server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
        try:
            server.start()
        except (KeyboardInterrupt, SystemExit):
            pass
        finally:
            scheduler.shutdown()
    

    2.3 rpyc服务添加定时任务为http请求

    2.3.1rpyc服务后端
    2.3.2调用示例
    #!usr/bin/env python
    
    import os
    import rpyc
    import logging
    
    from app.conf import common_config
    
    fast_api_env = os.environ.get('FAST_API_ENV')
    
    
    def from_crontab(cron):
        values = cron.split(' ')
        return {
            'minute': values[0],
            'hour': values[1],
            'day': values[2],
            'month': values[3],
            'day_of_week': values[4],
        }
    
    
    def run_task():
        """启动任务"""
        try:
            rpyc_conn = rpyc.connect(common_config.HOST, common_config.PORT)
        except ConnectionRefusedError:
            logging.error('run scheduler tasks err, connect err!')
            return False
        # bgsrv = rpyc.BgServingThread(rpyc_conn)   # 异步回调需要开启线程处理器
        # creates a bg thread to process incoming events
        logging.info('run scheduler tasks')
        # 添加job1
        rpyc_conn.root.add_request_job(id=f'job1', task_id='job1',, trigger='cron',**from_crontab('* * * * *'))
        # 添加job2
        rpyc_conn.root.add_request_job(id=f'job2', task_id='job2', trigger='cron', **from_crontab('* * * * *'))
    
        # bgsrv.stop()
        rpyc_conn.close()
        logging.info('run scheduler tasks success!')
    
    
    2.3.3服务端接口
    from fastapi import APIRouter, Depends, Query
    from sqlalchemy.orm import Session
    
    from app.schemas.response import Response
    from app.api.utils.common import get_db
    
    # 具体的定时任务内容
    from app.service.jobs import job1, job2
    
    router = APIRouter()
    
    @router.get("/run-task", response_model=Response)
    def run_task(
            task_id: str = Query(...),
            db: Session = Depends(get_db),
    ):
        """调用接口,自定义的task_id对应一个job任务,可选择数据库存储映射关系"""
        if task_id == 'job1':
            job1(db)
        elif task_id == 'job2':
            job2(db)
        else:
            return {'code': 1, 'error': '无效任务类型'}
        return {'msg': 'completed!'}
    
    

    2.4问题

    1. 2.1这种方式会导致业务代码需要在rpyc服务在写一遍,肯定造成业务代码脱离项目
    • 如果定时业务代码和原项目代码不关联,可以采用此种方案
    1. 2.2通过rpyc服务回调添加定时任务,这种方式的弊端是FastAPI服务端与rpyc服务需要长连接,且不能做持久化存储,服务之间强关联
    • 如果不考虑长连接影响可以采用此方案
    1. 2.3rpyc服务添加定时任务为http请求,这种方式会绕一个大弯
    • 解决了服务之间强关联,个人推荐使用方案,如果觉得low,建议采用其它方案(celery+MQ)做定时任务
  • 相关阅读:
    北京燃气IC卡充值笔记
    随机分析、随机控制等科目在量化投资、计算金融方向有哪些应用?
    量化交易平台大全
    Doctor of Philosophy in Computational and Mathematical Engineering
    Institute for Computational and Mathematical Engineering
    Requirements for the Master of Science in Computational and Mathematical Engineering
    MSc in Mathematical and Computational Finance
    万字长文:详解多智能体强化学习的基础和应用
    数据处理思想和程序架构: 使用Mbedtls包中的SSL,和服务器进行网络加密通信
    31-STM32+W5500+AIR202/302基本控制篇-功能优化-W5500移植mbedtls库以SSL方式连接MQTT服务器(单向忽略认证)
  • 原文地址:https://www.cnblogs.com/zhangliang91/p/12468871.html
Copyright © 2011-2022 走看看