zoukankan      html  css  js  c++  java
  • APScheduler

    简介

    APScheduler:强大的任务调度工具,可以完成定时任务周期任务等,它是跨平台的,用于取代Linux下的cron daemon或者Windows下的task scheduler。

    内置三种调度调度系统:

    • Cron风格
    • 间隔性执行
    • 仅在某个时间执行一次

    作业存储的backends支持:

    • Memory
    • SQLAlchemy (any RDBMS supported by SQLAlchemy works)
    • MongoDB
    • Redis
    • RethinkDB
    • ZooKeeper

    基本概念:4个组件
    triggers: 描述一个任务何时被触发,有按日期、按时间间隔、按cronjob描述式三种触发方式
    job stores: 任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。
    executors: 执行任务模块,当任务完成时executors通知schedulers,schedulers收到后会发出一个适当的事件
    schedulers: 任务调度器,控制器角色,通过它配置job stores和executors,添加、修改和删除任务。

    插件机制: 供用户自由选择scheduler, job store(s), executor(s) and trigger(s)

    scheduler

    scheduler的主循环(main_loop),其实就是反复检查是不是有到时需要执行的任务,完成一次检查的函数是_process_jobs, 这个函数做这么几件事:

    • 询问自己的每一个jobstore,有没有到期需要执行的任务(jobstore.get_due_jobs())
    • 如果有,计算这些job中每个job需要运行的时间点(run_times = job._get_run_times(now))如果run_times有多个,这种情况我们上面讨论过,有coalesce检查
      提交给executor排期运行(executor.submit_job(job, run_times))
    • 那么在这个_process_jobs的逻辑,什么时候调用合适呢?如果不间断地调用,而实际上没有要执行的job,是一种浪费。每次掉用_process_jobs后,其实可以预先判断一下,下一次要执行的job(离现在最近的)还要多长时间,作为返回值告诉main_loop, 这时主循环就可以去睡一觉,等大约这么长时间后再唤醒,执行下一次_process_jobs。这里唤醒的机制就会有IO模型的区别了

    scheduler由于IO模型的不同,可以有多种实现,内置scheduler供选:

    • BlockingScheduler: scheduler在当前进程的主线程中运行,所以调用start函数会阻塞当前线程,不能立即返回。
    • BackgroundScheduler: 放到后台线程中运行,所以调用start后主线程不会阻塞
    • AsyncIOScheduler: 使用asyncio模块
    • GeventScheduler: 使用gevent作为IO模型,和GeventExecutor配合使用
    • TornadoScheduler: 配合TwistedExecutor,用reactor.callLater完成定时唤醒
    • TwistedScheduler: 使用tornado的IO模型,用ioloop.add_timeout完成定时唤醒
    • QtScheduler: 使用QTimer完成定时唤醒

    jobstore

    jobstore提供给scheduler一个序列化jobs的统一抽象,提供对scheduler中job的增删改查接口,根据存储backend的不同,分以下几种
    内置job stores供选:

    • MemoryJobStore:没有序列化,jobs就存在内存里,增删改查也都是在内存中操作
    • SQLAlchemyJobStore:所有sqlalchemy支持的数据库都可以做为backend,增删改查操作转化为对应backend的sql语句
    • MongoDBJobStore:用mongodb作backend
    • RedisJobStore: 用redis作backend

    除了MemoryJobStore外,其他几种都使用pickle做序列化工具,所以这里要指出一点,如果你不是在用内存做jobstore,那么必须确保你提供给job的可执行函数必须是可以被全局访问的,也就是可以通过ref_to_obj反查出来的,否则无法序列化。
    使用数据库做jobstore,就会发现,其实创建了一张有三个域的的jobs表,分别是****id, next_run_time, job_state,其中job_state是job对象pickle序列化后的二进制**,而id和next_run_time则是支持job的两类查询(按id和按最近运行时间)

    executor

    aps把任务最终的执行机制也抽象了出来,可以根据IO模型选配,不需要讲太多,最常用的是threadpool和processpoll两种(来自concurrent.futures的线程/进程池)。

    不同类型的executor实现自己的_do_submit_job,完成一次实际的任务实例执行。以线程/进程池实现为例
    内置executors供选:

    • ProcessPoolExecutor: 多进程,可指定进程数,当工作负载为CPU密集型操作时可以考虑使用它来利用多核CPU
    • ThreadPoolExecutor: 多线程,可指定线程数,默认,可以满足大多数用途
    • AsyncIOExecutor
    • DebugExecutor
    • GeventExecutor
    • ProcessPoolExecutor
    • ThreadPoolExecutor
    • TwistedExecutor

    trigger

    trigger是抽象出了“一个job是何时被触发”这个策略,每种trigger实现自己的get_next_fire_time函数
    aps提供的trigger包括:

    • date:一次性指定日期
    • interval:在某个时间范围内间隔多长时间执行一次
    • cron:和unix crontab格式兼容,最为强大

    默认配置: 使用MemoryJobStore和ThreadPoolExecutor
    优点:插件化思想和抽象出接口,策略与不同实现机制分离

    User guide

    配置scheduler
    官网提供了等价的三种方法,第一种比较简洁明了。

    from pytz import utc
    
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
    jobstores = {
        'mongo': MongoDBJobStore(),
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
    executors = {
        'default': ThreadPoolExecutor(20),
        'processpool': ProcessPoolExecutor(5)
    }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
    
    from apscheduler.schedulers.background import BackgroundScheduler
    # 使用默认配置,即MemoryJobStore和ThreadPoolExecutor(10)
    scheduler = BackgroundScheduler()
    

    启动调度器
    调用调度器的start()方法

    添加任务
    两种方式:

    • 调用调度器的add_job()
    • 使用调度器的scheduled_job()装饰器: 很简洁,推荐这种。

    其他的不常用操作如移除任务、暂停和恢复任务、获取调度了的任务列表、修改任务、关停调度器、暂停/恢复任务处理等见文档:http://apscheduler.readthedocs.io/en/latest/userguide.html

    限制并发执行的任务实例数量
    默认同一时刻只能有一个实例运行,通过max_instances=3修改为3个。

    错过执行的任务与合并
    misfire_grace_time:如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。
    合并:最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次

    Scheduler 事件
    监听Scheduler发出的事件并作出处理,如任务执行完、任务出错等

    def my_listener(event):
        if event.exception:
            print('The job crashed :(') # or logger.fatal('The job crashed :(')
        else:
            print('The job worked :)')
    
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    

    参考资料:
    http://www.cnblogs.com/quijote/p/4385774.html




  • 相关阅读:
    2020软件工程作业01
    问题清单
    2020软件工程个人作业06——软件工程实践总结作业
    感谢随笔
    2020软件工程作业05
    2020软件工程作业04
    2020软件工程作业03
    2020软件工程作业02
    2020软件工程作业01
    2020软件工程个人作业06——软件工程实践总结作业
  • 原文地址:https://www.cnblogs.com/ExMan/p/10430964.html
Copyright © 2011-2022 走看看