zoukankan      html  css  js  c++  java
  • Python下定时任务框架APScheduler的使用

    1.APScheduler简介: 

          APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date、固定时间间隔interval 、以及类似于Linux上的定时任务crontab类型的定时任务。并且该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便。

    2.APScheduler安装:

          APScheduler的安装相对来说也非常简单,可以直接利用pip安装,如果没有pip可以下载源码,利用源码安装。

          1).利用pip安装:(推荐) 

          # pip install apscheduler 

          2).基于源码安装:https://pypi.python.org/pypi/APScheduler/ 

          # python setup.py install 

          3.基本概念

          APScheduler有四种组件及相关说明:

          1) triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了他们自己初始化配置外,触发器完全是无状态的。

          2)job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其它作业存储器可以将任务作业保存到各种数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。

          3) executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器将会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务如比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据根据实际需求可以同时使用两种执行器。

          4) schedulers(调度器):调度器是将其它部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器,相反调度器提供了处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如可以添加。修改、移除任务作业。  

        APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:

          BlockingScheduler:适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用。

          BackgroundScheduler: 适合于要求任何在程序后台运行的情况,当希望调度器在应用后台执行时使用。

          AsyncIOScheduler:适合于使用asyncio框架的情况

          GeventScheduler: 适合于使用gevent框架的情况

          TornadoScheduler: 适合于使用Tornado框架的应用

          TwistedScheduler: 适合使用Twisted框架的应用

          QtScheduler: 适合使用QT的情况

       1)下面一个简单的示例:

          import time
          from apscheduler.schedulers.blocking import BlockingScheduler
          def test_job():
            print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
          scheduler = BlockingScheduler()
          '''
          #该示例代码生成了一个BlockingScheduler调度器,使用了默认的默认的任务存储MemoryJobStore,以及默认的执行器ThreadPoolExecutor,并且最大线程数为10。
          '''
          scheduler.add_job(test_job, 'interval', seconds=5, id='test_job')
          '''
          #该示例中的定时任务采用固定时间间隔(interval)的方式,每隔5秒钟执行一次。
          #并且还为该任务设置了一个任务id
        scheduler.start()

      2)如果想执行一些复杂任务,如上边所说的同时使用两种执行器,或者使用多种任务存储方式,并且需要根据具体情况对任务的一些默认参数进行调整。可以参考下面的方式。(源码解析:http://apscheduler.readthedocs.io/en/latest/userguide.html)

    第一种方式:

          from pytz import utc
          from apscheduler.schedulers.background import BackgroundScheduler  # 导入调度器
    from apscheduler.jobstores.mongodb import MongoDBJobStore # 导入作业存储
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # 导入作业存储
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # 导入执行器
    jobstores
    = {   'mongo': MongoDBJobStore(),   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = {   'default': ThreadPoolExecutor(20),   'processpool': ProcessPoolExecutor(5) } job_defaults = {   'coalesce': False,   'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

    第二种方式:

          from apscheduler.schedulers.background import BackgroundScheduler
          scheduler = BackgroundScheduler({
            'apscheduler.jobstores.mongo': {
              'type': 'mongodb'
            },
            'apscheduler.jobstores.default': {
              'type': 'sqlalchemy',
              'url': 'sqlite:///jobs.sqlite'
            },
            'apscheduler.executors.default': {
              'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
              'max_workers': '20'
            },
            'apscheduler.executors.processpool': {
              'type': 'processpool',
              'max_workers': '5'
            },
            'apscheduler.job_defaults.coalesce': 'false',
            'apscheduler.job_defaults.max_instances': '3',
            'apscheduler.timezone': 'UTC',
          })

     第三种方式:

          from pytz import utc
          from apscheduler.schedulers.background import BackgroundScheduler
          from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
          from apscheduler.executors.pool import ProcessPoolExecutor
          jobstores = {
            'mongo': {'type': 'mongodb'},
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
          }
          executors = {
            'default': {'type': 'threadpool', 'max_workers': 20},
            'processpool': ProcessPoolExecutor(max_workers=5)
          }
          job_defaults = {
            'coalesce': False,
            'max_instances': 3
          }
          scheduler = BackgroundScheduler()
          scheduler.configure(jobstores=jobstores, executors=executors,job_defaults=job_defaults, timezone=utc)

    5.对任务作业的基本操作:

          1).添加作业有两种方式:第一种可以直接调用add_job(),第二种使用scheduled_job()修饰器。

          而add_job()是使用最多的,它可以返回一个apscheduler.job.Job实例,因而可以对它进行修改或者删除,而使用修饰器添加的任务添加之后就不能进行修改。

          #!/usr/bin/env python
          #-*- coding:UTF-8
          import time
          import datetime
          from apscheduler.schedulers.blocking import BlockingScheduler
          def job1(f):
              print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), f
          def job2(arg1, args2, f):
              print f, args1, args2
          def job3(**args):
              print args

     APScheduler支持以下三种定时任务:

          cron: crontab类型任务

          interval: 固定时间间隔任务

          date: 基于日期时间的一次性任务

    scheduler = BlockingScheduler()

          #循环任务示例

          scheduler.add_job(job1, 'interval', seconds=5, args=(1,), id='test_job1')

          #定时任务示例

          scheduler.add_job(job1, 'cron', second='*/5', args=(1,2,3,), id='test_job2')

          #一次性任务示例

          scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=(1,), id='test_job3')

    传递参数的方式有元组(tuple)、列表(list)、字典(dict)

          注意:不过需要注意采用元组传递参数时后边需要多加一个逗号     

          #基于list

          scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4')

          #基于tuple

          scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5')

          #基于dict

          scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job6)

          print scheduler.get_jobs()

          scheduler.start()

    或者使用scheduled_job()修饰器来添加作业: 

          @sched.scheduled_job('cron', second='*/5' ,id='my_job_id',)

          def test_task():

            print("Hello world!")

    2).获得任务列表:

          可以通过get_jobs方法来获取当前的任务列表,也可以通过get_job()来根据job_id来获得某个任务的信息。并且apscheduler还提供了一个print_jobs()方法来打印格式化的任务列表。

          例如: 

          scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

          print scheduler.get_job('my_job_id')

          print scheduler.get_jobs()

    3).修改任务:

          修改任务的属性可以使用apscheduler.job.Job.modify()或者modify_job()方法,可以修改除了id的其它任何属性。

          例如: 

          job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job' name='test_job')

          job.modify(max_instances=5, name='my_job')

     4).删除任务:

          删除调度器中的任务有可以用remove_job()根据job ID来删除指定任务或者使用remove(),如果使用remove()需要事先保存在添加任务时返回的实例对象,任务删除后就不会在执行。

          注意:通过scheduled_job()添加的任务只能使用remove_job()进行删除。

          例如: 

          job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

          job.remove() 

          或者 

          scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

          scheduler.remove_job('my_job')

    5).暂停与恢复任务:

          暂停与恢复任务可以直接操作任务实例或者调度器来实现。当任务暂停时,它的运行时间会被重置,暂停期间不会计算时间。

          暂停任务: 

          apscheduler.job.Job.pause()

          apscheduler.schedulers.base.BaseScheduler.pause_job() 

          恢复任务 

          apscheduler.job.Job.resume()

          apscheduler.schedulers.BaseScheduler.resume_job()

          

    6).启动调度器

          可以使用start()方法启动调度器,BlockingScheduler需要在初始化之后才能执行start(),对于其他的Scheduler,调用start()方法都会直接返回,然后可以继续执行后面的初始化操作。

          例如: 

          from apscheduler.schedulers.blocking import BlockingScheduler

          def my_job():

            print "Hello world!"

          scheduler = BlockingScheduler()

          scheduler.add_job(my_job, 'interval', seconds=5)

          scheduler.start()

     7).关闭调度器:

          使用下边方法关闭调度器: 

          scheduler.shutdown() 

          默认情况下调度器会关闭它的任务存储和执行器,并等待所有正在执行的任务完成,如果不想等待,可以进行如下操作: 

          scheduler.shutdown(wait=False)

    注意:

          当出现No handlers could be found for logger “apscheduler.scheduler”次错误信息时,说明没有 logging模块的logger存在,所以需要添加上,对应新增内容如下所示(仅供参):

          import logging

          logging.basicConfig(

        level=logging.DEBUG,

            format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',

            datafmt='%a, %d %b %Y %H:%M:%S',

            filename='/var/log/aaa.txt',

            filemode='a'

      )

       

  • 相关阅读:
    366. Find Leaves of Binary Tree输出层数相同的叶子节点
    716. Max Stack实现一个最大stack
    515. Find Largest Value in Each Tree Row查找一行中的最大值
    364. Nested List Weight Sum II 大小反向的括号加权求和
    156. Binary Tree Upside Down反转二叉树
    698. Partition to K Equal Sum Subsets 数组分成和相同的k组
    244. Shortest Word Distance II 实现数组中的最短距离单词
    187. Repeated DNA Sequences重复的DNA子串序列
    java之hibernate之基于主键的双向一对一关联映射
    java之hibernate之基于主键的单向一对一关联映射
  • 原文地址:https://www.cnblogs.com/domestique/p/7814007.html
Copyright © 2011-2022 走看看