zoukankan      html  css  js  c++  java
  • FastAPI+apSheduler动态定时任务


    阅读目录

    一、apSheduler

    二、实战应用

    apSheduler

    1.安装

    pip install apscheduler

    2.基础组件

    • triggers 触发器
    • job stores job存储
    • executors 执行器
    • schedulers 调度器

    3.选择合适的调度器,存储器,执行器,触发器

    3.1调度器(schedulers)

    • BlockingScheduler: 进程中只有调度器
    • BackgroundScheduler: 非以下框架,且希望运行在后台
    • AsyncIOScheduler: 应用程序使用asyncio
    • GeventScheduler: 应用程序使用gevent
    • TornadoScheduler: 构建Tornado
    • TwistedScheduler: 构建Twisted应用
    • QtScheduler: 构建Qt应用

    3.2存储器(job stores)

    • 持久化存储job,通过SQLAlchemyJobStore设置存储链接
    • 非持久化存储job,重启时重新创建job,默认MemoryJobStore内存存储

    3.3执行器(executors)

    • processpoolexecutor,CUP密集型业务,可选进程池,也可以同线程池同时使用
    • threadpoolexecutor,默人线程池

    3.4触发器(triggers )

    • date: 设置日期,针对某个时间点运行一次job
    • interval: 固定时间间隔运行job
    • cron: 类似linux-crontab,某个时间点定期运行job

    4.配置调度器

    from pytz import utc
    
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ProcessPoolExecutor
    
    
    jobstores = {
        # 可以配置多个存储
        'mongo': {'type': 'mongodb'}, 
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
    }
    executors = {
        'default': {'type': 'threadpool', 'max_workers': 20},     # 最大工作线程数20
        'processpool': ProcessPoolExecutor(max_workers=5)         # 最大工作进程数为5
    }
    job_defaults = {
        'coalesce': False,   # 关闭新job的合并,当job延误或者异常原因未执行时
        'max_instances': 3   # 并发运行新job默认最大实例多少
    }
    scheduler = BackgroundScheduler()
    
    # .. do something else here, maybe add jobs etc.
    
    scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区
    

    5.调度器的增删改查

    import os
    import time
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def print_time(name):
        print(f'{name} - {time.ctime()}')
    
    def add_job(job_id, func, args, seconds):
        """添加job"""
        print(f"添加job - {job_id}")
        scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)
    
    def remove_job(job_id):
        """移除job"""
        scheduler.remove_job(job_id)
        print(f"移除job - {job_id}")
    
    def pause_job(job_id):
        """停止job"""
        scheduler.pause_job(job_id)
        print(f"停止job - {job_id}")
    
    def resume_job(job_id):
        """恢复job"""
        scheduler.resume_job(job_id)
        print(f"恢复job - {job_id}")
    
    def get_jobs():
        """获取所有job信息,包括已停止的"""
        res = scheduler.get_jobs()
        print(f"所有job - {res}")
    
    def print_jobs():
        print(f"详细job信息")
        scheduler.print_jobs()
    
    def start():
        """启动调度器"""
        scheduler.start()
    
    def shutdown():
        """关闭调度器"""
        scheduler.shutdown()
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        start()
        print('Press Ctrl+{0} to exit 
    '.format('Break' if os.name == 'nt' else 'C'))
        add_job('job_A', func=print_time, args=("A", ), seconds=1)
        add_job('job_B', func=print_time, args=("B", ), seconds=2)
        time.sleep(6)
        pause_job('job_A')
        get_jobs()
        time.sleep(6)
        print_jobs()
        resume_job('job_A')
        time.sleep(6)
        remove_job('job_A')
        time.sleep(6)
        try:
            shutdown()
        except RuntimeError:
            pass
    

    6.调度事件

    可以将事件侦听器附加到调度程序。调度程序事件在某些情况下会被触发,并且可能会在其中携带有关该特定事件详细信息的附加信息。通过给add_listener()提供适当的掩码参数或者将不同的常量放在一起,可以只监听特定类型的事件。用一个参数调用侦听器callable,即event对象。

    def my_listener(event):
        if event.exception:
            print('The job crashed :(')
        else:
            print('The job worked :)')
    
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    

    7.配置日志

    import logging
    
    logging.basicConfig()
    logging.getLogger('apscheduler').setLevel(logging.DEBUG)
    

    实战应用

    1.fastapi动态添加定时任务

    import asyncio
    import datetime
    import uvicorn
    from fastapi import FastAPI, Body
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ProcessPoolExecutor
    from apscheduler.triggers.cron import CronTrigger
    
    app = FastAPI(title='fast-api')
    
    scheduler = None
    
    
    @app.on_event('startup')
    def init_scheduler():
        """初始化"""
        jobstores = {
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
        }
        executors = {
            'default': {'type': 'threadpool', 'max_workers': 20},  # 最大工作线程数20
            'processpool': ProcessPoolExecutor(max_workers=5)  # 最大工作进程数为5
        }
        global scheduler
        scheduler = AsyncIOScheduler()
        scheduler.configure(jobstores=jobstores, executors=executors)
        # 添加一个coroutine执行,结果很不理想...
        scheduler.add_job(tick, 'interval', seconds=3)
        print("启动调度器...")
    
        scheduler.start()
    
    
    def print_time(name):
        print(f'{name} - {datetime.datetime.now()}')
    
    
    async def tick():
        print('Tick! The time is: %s' % datetime.datetime.now())
        await asyncio.sleep(1)
    
    
    @app.post('/add-job')
    async def add_job(job_id: str = Body(...), cron: str = Body(...)):
        """添加job"""
        scheduler.add_job(id=job_id, func=print_time, args=(job_id, ), trigger=CronTrigger.from_crontab(cron))
        return {"msg": "success!"}
    
    
    @app.post('/remove-job')
    async def remove_job(job_id: str = Body(..., embed=True)):
        """移除job"""
        scheduler.remove_job(job_id)
        return {"msg": "success!"}
    
    
    if __name__ == '__main__':
        uvicorn.run(app, host='127.0.0.1', port=5566)
    

    1.1问题

    1. AsyncIOScheduler()添加协程job会有问题
    2. apscheduler在多进程中会多次加载job,导致job重复执行,怎么解决呢?
      • 官方推荐可以用http、rpyc、grpc等方式解决

    2.搭建rpyc服务添加定时任务

    2.1定时业务代码在rpyc服务中

    2.2通过rpyc服务回调添加定时任务

    server.py
    import datetime
    import uvicorn
    from fastapi import FastAPI, Body
    import rpyc
    
    app = FastAPI(title='fast-api')
    
    conn = None
    bgsrv = None
    mon = None
    
    @app.on_event('startup')
    def init_scheduler():
        """初始化"""
        global conn,bgsrv,mon
        conn = rpyc.connect("localhost", 12345)
        # create a bg thread to process incoming events
        bgsrv = rpyc.BgServingThread(conn)  # 执行回调函数必须在开始线程
        mon = conn.root.Monitor(print_time)
    
    def print_time(name):
        print(f'{name} - {datetime.datetime.now()}')
    
    def from_crontab(cron):
        values = cron.split(' ')
        return {
            'minute': values[0],
            'hour': values[1],
            'day': values[2],
            'month': values[3],
            'day_of_week': values[4],
        }
    
    
    @app.post('/add-job')
    async def add_job(job_id: str = Body(...), cron: str = Body(...)):
        """添加job"""
        mon.add_job(id=job_id, args=(job_id, ), trigger='cron',  **from_crontab(cron))
        return {"msg": "success!"}
    
    @app.post('/remove-job')
    async def remove_job(job_id: str = Body(..., embed=True)):
        """移除job"""
        mon.remove_job(job_id)
        return {"msg": "success!"}
    
    if __name__ == '__main__':
        uvicorn.run(app, host='127.0.0.1', port=5566)
    
    rpc.py
    import rpyc
    from rpyc.utils.server import ThreadedServer
    from apscheduler.schedulers.background import BackgroundScheduler
    
    class SchedulerService(rpyc.Service):
    
        class exposed_Monitor(object):   # exposing names is not limited to methods :)
            def __init__(self, callback):
                # 这里需要用rpyc.async_异步加载回调函数
                self.callback = rpyc.async_(callback)
    
            def exposed_add_job(self, *args, **kwargs):
                print("添加任务:", args, kwargs)
                return scheduler.add_job(self.callback, *args, **kwargs)
    
            def exposed_pause_job(self, job_id, jobstore=None):
                return scheduler.pause_job(job_id, jobstore)
    
            def exposed_resume_job(self, job_id, jobstore=None):
                return scheduler.resume_job(job_id, jobstore)
    
            def exposed_remove_job(self, job_id, jobstore=None):
                scheduler.remove_job(job_id, jobstore)
    
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        scheduler.start()
        protocol_config = {'allow_public_attrs': True}
        server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
        try:
            server.start()
        except (KeyboardInterrupt, SystemExit):
            pass
        finally:
            scheduler.shutdown()
    

    2.3 rpyc服务添加定时任务为http请求

    2.3.1rpyc服务后端
    2.3.2调用示例
    #!usr/bin/env python
    
    import os
    import rpyc
    import logging
    
    from app.conf import common_config
    
    fast_api_env = os.environ.get('FAST_API_ENV')
    
    
    def from_crontab(cron):
        values = cron.split(' ')
        return {
            'minute': values[0],
            'hour': values[1],
            'day': values[2],
            'month': values[3],
            'day_of_week': values[4],
        }
    
    
    def run_task():
        """启动任务"""
        try:
            rpyc_conn = rpyc.connect(common_config.HOST, common_config.PORT)
        except ConnectionRefusedError:
            logging.error('run scheduler tasks err, connect err!')
            return False
        # bgsrv = rpyc.BgServingThread(rpyc_conn)   # 异步回调需要开启线程处理器
        # creates a bg thread to process incoming events
        logging.info('run scheduler tasks')
        # 添加job1
        rpyc_conn.root.add_request_job(id=f'job1', task_id='job1',, trigger='cron',**from_crontab('* * * * *'))
        # 添加job2
        rpyc_conn.root.add_request_job(id=f'job2', task_id='job2', trigger='cron', **from_crontab('* * * * *'))
    
        # bgsrv.stop()
        rpyc_conn.close()
        logging.info('run scheduler tasks success!')
    
    
    2.3.3服务端接口
    from fastapi import APIRouter, Depends, Query
    from sqlalchemy.orm import Session
    
    from app.schemas.response import Response
    from app.api.utils.common import get_db
    
    # 具体的定时任务内容
    from app.service.jobs import job1, job2
    
    router = APIRouter()
    
    @router.get("/run-task", response_model=Response)
    def run_task(
            task_id: str = Query(...),
            db: Session = Depends(get_db),
    ):
        """调用接口,自定义的task_id对应一个job任务,可选择数据库存储映射关系"""
        if task_id == 'job1':
            job1(db)
        elif task_id == 'job2':
            job2(db)
        else:
            return {'code': 1, 'error': '无效任务类型'}
        return {'msg': 'completed!'}
    
    

    2.4问题

    1. 2.1这种方式会导致业务代码需要在rpyc服务在写一遍,肯定造成业务代码脱离项目
    • 如果定时业务代码和原项目代码不关联,可以采用此种方案
    1. 2.2通过rpyc服务回调添加定时任务,这种方式的弊端是FastAPI服务端与rpyc服务需要长连接,且不能做持久化存储,服务之间强关联
    • 如果不考虑长连接影响可以采用此方案
    1. 2.3rpyc服务添加定时任务为http请求,这种方式会绕一个大弯
    • 解决了服务之间强关联,个人推荐使用方案,如果觉得low,建议采用其它方案(celery+MQ)做定时任务
  • 相关阅读:
    转:10分钟掌握XML、JSON及其解析
    转:关于C++14:你需要知道的新特性
    2014/11/4~2014/12/20阶段性目标
    转:快速掌握一个语言最常用的50%
    推荐!国外程序员整理的 C++ 资源大全
    数据库面试宝典
    sqlite学习
    android
    转:c的回归-云风
    原创: 开题报告中摘要部分快速将一段文字插入到word的表格中
  • 原文地址:https://www.cnblogs.com/zhangliang91/p/12468871.html
Copyright © 2011-2022 走看看