zoukankan      html  css  js  c++  java
  • apschedule基本使用

    安装:

    pip install apschedule

    作用:

    实现定时或间隔任务

    基本概念:

    分为以下四个部分:

    • 触发器(triggers): 设定触发任务的条件
    • 任务存储器(job stores):存放任务的地方,分为内存或数据库,默认放在内存中,一个存储器不要给多个调度器用,容易造成混乱
    • 执行器(executors):用于执行任务,分为进程池或者线程池,任务执行完毕后会通知给调度器
    • 调度器(schedule):将以上三个组件的设置作为参数,通过创建调度器来实现任务

    调度器类型:

    • BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。
    • BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
    • AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。
    • GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。
    • TornadoScheduler Tornado调度器,适用于构建Tornado应用。
    • TwistedScheduler Twisted调度器,适用于构建Twisted应用。
    • QtScheduler Qt调度器,适用于构建Qt应用。

    任务储存器:要看任务是否需要持久化。如果是无状态的,选择默认任务储存器MemoryJobStore就可以。但是,如果需要在程序关闭或重启时,保存任务的状态,就要选择持久化的任务储存器。作者推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库。这个方案可以提供强大的数据整合与保护功能。

    执行器:默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。不过程序如果是计算密集型的,那么最好用ProcessPoolExecutor进程池执行器方案来充分利用多核算力。也可以将ProcessPoolExecutor作为第二执行器,混合使用两种不同的执行器。

    触发器的种类:

    • date 日期:触发任务运行的具体日期
    • interval 间隔:触发任务运行的时间间隔
    • cron 周期:触发任务运行的周期

    一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。复合触发器

    触发器示例:

    date:

    from datetime import datetime
    
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def task():
        print('test')
    
    
    schedule = BlockingScheduler()
    schedule.add_job(task, 'date', run_date=datetime(2021, 7, 4, 17, 4, 12)) 
    # 也可以写成字符的形式:'2021-7-4 17:04:12',如果不指定时间,则立即执行。后面可接args参数,用于给函数传参 schedule.start()

     interval:

    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def task():
        print('test')
    
    
    schedule = BlockingScheduler()
    schedule.add_job(task, 'interval', seconds=2, start_date='2021-07-03 18:17:00', end_date='2021-07-03 18:18:00')
    schedule.start()

    cron:

    参数:

    class apscheduler.triggers.cron.CronTrigger(
    year=None, 
    month=None, 
    day=None, 
    week=None, 
    day_of_week=None, 
    hour=None, 
    minute=None,
    second=None, 
    start_date=None, 
    end_date=None, 
    timezone=None, 
    jitter=None)
    

    当省略时间参数时,在显式指定参数之前的参数会被设定为*,之后的参数会被设定为最小值,week 和day_of_week的最小值为*。比如,设定day=1, minute=20等同于设定year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0,即每个月的第一天,且当分钟到达20时就触发。

    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def task():
        print('test')
    
    
    schedule = BlockingScheduler()
    schedule.add_job(task, 'cron', minute='23')
    # 同样可以用start_date和end_date来控制时间的范围
    schedule.start()
    

    用装饰器的方式执行调度器

    from apscheduler.schedulers.blocking import BlockingScheduler
    
    schedule = BlockingScheduler()
    
    
    # jitter 是随机浮动秒数,在分布式部署中设置可以避免所有服务器同时执行任务
    @schedule.scheduled_job('cron', minute='29', jitter=2)
    def task():
        print('test')
    
    
    # schedule.add_job(task, 'cron', minute='23')
    schedule.start()

    调度器配置:

    from pytz import utc
    
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ProcessPoolExecutor
    
    
    jobstores = {
        'mongo': {'type': 'mongodb'},
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
    executors = {
        'default': {'type': 'threadpool', 'max_workers': 20},
        'processpool': ProcessPoolExecutor(max_workers=5)
    }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    scheduler = BackgroundScheduler()
    
    # ..这里可以添加任务
    
    scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
    

    以下例子均是在以add_job的形式下添加任务的操作

    移除任务:

    job = schedule.add_job(task, 'interval', seconds=2)
    job.remove()

    # 或者是这种方式
    job = schedule.add_job(task, 'interval', seconds=2, id='1')

    schedule.remove_job('1')

    暂停 回复任务 关闭调度器:

    schedule.pause()  # 暂停任务
    schedule.resume()  # 回复任务
    schedule.shutdown(wait=False)  # 立刻关闭调度器,如果不加wait参数,则会等任务都执行完后再关闭调度器

    修改任务:

    通过apscheduler.job.Job.modify()modify_job(),你可以修改任务当中除了id的任何属性。

    job.modify(max_instances=6, name='Alternate name')
    # max_instances 设置运行的最大示例数

    如果想要重新调度任务(就是改变触发器),你能通过apscheduler.job.Job.reschedule()reschedule_job()来实现。这些方法会重新创建触发器,并重新计算下次运行时间。

    scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
    

    丢失任务的执行与合并

    有时,任务会由于一些问题没有被执行。最常见的情况就是,在数据库里的任务到了该执行的时间,但调度器被关闭了,那么这个任务就成了“哑弹任务”。错过执行时间后,调度器才打开了。这时,调度器会检查每个任务的misfire_grace_time参数int值,即哑弹上限,来确定是否还执行哑弹任务(这个参数可以全局设定的或者是为每个任务单独设定)。此时,一个哑弹任务,就可能会被连续执行多次。

    但这就可能导致一个问题,有些哑弹任务实际上并不需要被执行多次。coalescing合并参数就能把一个多次的哑弹任务揉成一个一次的哑弹任务。也就是说,coalescingTrue能把多个排队执行的同一个哑弹任务,变成一个,而不会触发哑弹事件。

    注!如果是由于线程池/进程池满了导致的任务延迟,执行器就会跳过执行。要避免这个问题,可以添加进程或线程数来实现或把 misfire_grace_time值调高。

    调度器监听事件:

    调度器允许添加事件侦听器。部分事件会有特有的信息,比如当前运行次数等。add_listener(callback,mask)中,第一个参数是回调对象,mask是指定侦听事件类型,mask参数也可以是逻辑组合。回调对象会有一个参数就是触发的事件。

    def my_listener(event):
        if event.exception:
            print('The job crashed :(')
        else:
            print('The job worked :)')
    
    # 当任务执行完或任务出错时,调用my_listener
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

    # 可从 apscheduler.events来查看监听任务的状态
  • 相关阅读:
    nginx负载均衡
    mysqld: Out of memory Centos 创建swap分区解决
    redis 基本命令
    查看日志常用命令
    StringIO和BytesIO
    paramiko初识
    微信小程序-drf登录认证组件
    微信小程序之模块化--module.exports
    celery 定时任务报错一
    微信小程序跨页面传值
  • 原文地址:https://www.cnblogs.com/fdsimin/p/14967432.html
Copyright © 2011-2022 走看看