zoukankan      html  css  js  c++  java
  • 56.apscheduler调度器介绍

    1.apscheduler介绍:

    1.介绍:

    介绍:
    最近一个程序要用到后台定时任务,看了看python后台任务,一般2个选择,一个是apscheduler,一个celery。apscheduler比较直观简单一点,就选说说这个库吧。网上一搜索,晕死,好多写apscheduler的都是超级老的版本,而且博客之间相互乱抄,错误一大堆。还是自己读官方文档,为大家理一遍吧。
    
    0.安装
    	pip install apscheduler
    

    2.组成:

    1. APScheduler四大组件:
    
    触发器 triggers :用于设定触发任务的条件
    
    任务储存器 job stores:用于存放任务,把任务存放在内存或数据库中
    
    执行器 executors: 用于执行任务,可以设定执行模式为单线程或线程池
    
    调度器 schedulers: 把上方三个组件作为参数,通过创建调度器实例来运行
    
    1.1 触发器 triggers 
    
    触发器包含调度逻辑。每个任务都有自己的触发器,用于确定何时应该运行作业。除了初始配置之外,触发器完全是无状态的。
    
    1.2 任务储存器 job stores
    
    默认情况下,任务存放在内存中。也可以配置存放在不同类型的数据库中。如果任务存放在数据库中,那么任务的存取有一个序列化和反序列化的过程,同时修改和搜索任务的功能也是由任务储存器实现。
    
    注意一个任务储存器不要共享给多个调度器,否则会导致状态混乱
    
    1.3 执行器 executors
    
    任务会被执行器放入线程池或进程池去执行,执行完毕后,执行器会通知调度器。
    
    1.4 调度器 schedulers
    
    一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。
    
    4

    3.调度器组件详解

    根据开发需求选择相应的组件,下面是不同的调度器组件:
    - BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。
    - BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
    - AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。
    - GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。
    - TornadoScheduler Tornado调度器,适用于构建Tornado应用。
    - TwistedScheduler Twisted调度器,适用于构建Twisted应用。
    - QtScheduler Qt调度器,适用于构建Qt应用。
    

    4.存储器和执行器

    任务器选择:
    	要看任务是否需要持久化。如果你运行的任务是无状态的,选择默认任务储存器MemoryJobStore就可以应付。但是,如果你需要在程序关闭或重启时,保存任务的状态,那么就要选择持久化的任务储存器。作者推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库。这个方案可以提供强大的数据整合与保护功能。
    
    执行器选择:
        同样要看你的实际需求。默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果,你的程序是计算密集型的,那么最好用ProcessPoolExecutor进程池执行器方案来充分利用多核算力。也可以将ProcessPoolExecutor作为第二执行器,混合使用两种不同的执行器。
    	配置一个任务,就要设置一个任务触发器。触发器可以设定任务运行的周期、次数和时间。
    

    5.APScheduler有三种内置的触发器

    date 日期:触发任务运行的具体日期
    interval 间隔:触发任务运行的时间间隔
    cron 周期:触发任务运行的周期
    calendarinterval:当您想要在一天中的特定时间以日历为基础的间隔运行任务时使用
    一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。
    
    5.1 date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
    	参数								说明
    run_date(datetime or str)		任务运行的日期或者时间
    timezone(datetime.tzinfo or str)	指定时区
    
    示例:
    # 在2019年4月15日执行
    scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 15), args=['测试任务'])
    # datetime类型(用于精确时间)
    scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 15, 17, 30, 5), args=['测试任务'])
    注意:run_date参数可以是date类型、datetime类型或文本类型。
    scheduler.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['测试任务'])
    
    5.2  interval 周期触发任务
    	固定时间间隔触发。interval 间隔调度,参数如下:
        参数									说明
        weeks(int)							间隔几周
        days(int)							间隔几天
        hours(int)							间隔几小时
        minutes(int)						间隔几分钟
        seconds(int)						间隔多少秒
        start_date(datetime or str)			 开始日期
        end_date(datetime or str)			 结束日期
        timezone(datetime.tzinfo or   str)	  时区
    
    示例:
    # 每2小时触发
    scheduler.add_job(job_func, 'interval', hours=2)
    
    # 在 2019-04-15 17:00:00 ~ 2019-12-31 24:00:00 之间, 每隔两分钟执行一次 job_func 方法
    scheduler .add_job(job_func, 'interval', minutes=2, start_date='2019-04-15 17:00:00' , end_date='2019-12-31 24:00:00')
    
    jitter振动参数,给每次触发添加一个随机浮动秒数,一般适用于多服务器,避免同时运行造成服务拥堵。
    # 每小时(上下浮动120秒区间内)运行`job_function`
    scheduler.add_job(job_func, 'interval', hours=1, jitter=120)
    
    5.3 cron触发器
    在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。
    cron 参数:
    
    参数										说明
    year(int or str)						年,4位数字
    month(int or str)						月(范围1-12)
    day(int or str)							日(范围1-31)
    week(int or str)						周(范围1-53)
    day_of_week(int or str)					周内第几天或者星期几(范围0-6或者mon,tue,wed,thu,fri,stat,sun)
    hour(int or str)						时(0-23)
    minute(int or str)						分(0-59)
    second(int or str)						秒(0-59)
    start_date(datetime or str)				最早开始日期(含)
    end_date(datetime or str)				最晚结束日期(含)
    timezone(datetime.tzinfo or   str)		 指定时区
    
    表达式类型
    
    表达式					参数类型				描述
    *						所有				通配符。例:minutes=*即每分钟触发
    */a						所有				可被a整除的通配符。
    a-b						所有				范围a-b触发
    a-b/c					所有				范围a-b,且可被c整除时触发
    xth y					日				 第几个星期几触发。x为第几个,y为星期几
    last x					日				 一个月中,最后个星期几触发
    last					日				一个月最后一天触发
    x,y,z					所有				组合表达式,可以组合确定值或上方的表达式
    注意:month和day_of_week参数分别接受的是英语缩写jan– dec 和 mon – sun
    
    示例:
    # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务
    scheduler.add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
    
    @scheduler.scheduled_job('cron', id='my_job_id', day='last sun')
    def some_decorated_task():
        print("I am printed at 00:00:00 on the last Sunday of every month!")
        
    注意:夏令时问题
    有些timezone时区可能会有夏令时的问题。这个可能导致令时切换时,任务不执行或任务执行两次。避免这个问题,可以使用UTC时间,或提前预知并规划好执行的问题。
    pri# 在Europe/Helsinki时区, 在三月最后一个周一就不会触发;在十月最后一个周一会触发两次
    sched.add_job(job_function, 'cron', hour=3, minute=30)
    

    6.配置调度程序

    	PScheduler提供了许多不同的方法来配置调度程序。您可以使用配置字典,也可以将选项作为关键字参数传递。您还可以先实例化调度程序,然后添加任务并配置调度程序。这样您就可以在任何环境中获得最大的灵活性
        可以在BaseScheduler类的API引用中找到调度程序级别配置选项的完整列表 。调度程序子类还可以具有其各自API引用中记录的其他选项。各个任务存储和执行程序的配置选项同样可以在其API参考页面上找到。
    	假设您希望在应用程序中使用默认作业存储和默认执行程序运行BackgroundScheduler:
        from apscheduler.schedulers.background import BackgroundScheduler
        scheduler = BackgroundScheduler()
        # 初始化程序
        
        这将为您提供一个BackgroundScheduler,其MemoryJobStore(内存任务储存器)名为“default”,ThreadPoolExecutor(线程池执行器)名为“default”,默认最大线程数为10。
        
        假如你现在有这样的需求,两个任务储存器分别搭配两个执行器;同时,还要修改任务的默认参数;最后还要改时区。可以参考下面例子,它们是完全等价的。
        名称为“mongo”的MongoDBJobStore
        名称为“default”的SQLAlchemyJobStore
        名称为“ThreadPoolExecutor ”的ThreadPoolExecutor,最大线程20个
        名称“processpool”的ProcessPoolExecutor,最大进程5个
        UTC时间作为调度器的时区
        默认为新任务关闭合并模式()
        设置新任务的默认最大实例数为3
        
        方法一:
        from pytz import utc
        from apscheduler.schedulers.background import BackgroundScheduler
        from apscheduler.jobstores.mongodb import MongoDBJobStore
        from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
        from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
        jobstores = {
            'mongo': MongoDBJobStore(),
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': ThreadPoolExecutor(20),
            'processpool': ProcessPoolExecutor(5)
        }
        job_defaults = {
            'coalesce': False,  # 默认为新任务关闭合并模式()
            'max_instances': 3  # 设置新任务的默认最大实例数为3
        }
        scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
        
        方法二:
        from apscheduler.schedulers.background import BackgroundScheduler
        # The "apscheduler." prefix is hard coded
        scheduler = BackgroundScheduler({
            'apscheduler.jobstores.mongo': {
                 'type': 'mongodb'
            },
            'apscheduler.jobstores.default': {
                'type': 'sqlalchemy',
                'url': 'sqlite:///jobs.sqlite'
            },
            'apscheduler.executors.default': {
                'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
                'max_workers': '20'
            },
            'apscheduler.executors.processpool': {
                'type': 'processpool',
                'max_workers': '5'
            },
            'apscheduler.job_defaults.coalesce': 'false',
            'apscheduler.job_defaults.max_instances': '3',
            'apscheduler.timezone': 'UTC',
        })
        
        方法三:
        from pytz import utc
        from apscheduler.schedulers.background import BackgroundScheduler
        from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
        from apscheduler.executors.pool import ProcessPoolExecutor
        jobstores = {
            'mongo': {'type': 'mongodb'},
            'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
        }
        executors = {
            'default': {'type': 'threadpool', 'max_workers': 20},
            'processpool': ProcessPoolExecutor(max_workers=5)
        }
        job_defaults = {
            'coalesce': False,
            'max_instances': 3
        }
        scheduler = BackgroundScheduler()
        # ..这里可以添加任务
        scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
        
        更多请参考:https://apscheduler.readthedocs.io/en/latest/userguide.html#starting-the-scheduler
    # 启动调度器
        启动调度器是只需调用start()即可。除了BlockingScheduler,非阻塞调度器都会立即返回,可以继续运行之后的代码,比如添加任务等。
        对于BlockingScheduler,程序则会阻塞在start()位置,所以,要运行的代码必须写在start()之前。
        注意:调度器启动后,就不可以修改配置。
    

    7.添加任务

    添加任务方法有两种:
    通过调用add_job()
    通过装饰器scheduled_job()
    
    7.1 优缺点
    	第一种方法是最常用的;第二种方法是最方便的,但缺点就是运行时,不能修改任务。
    	第一种add_job()方法会返回一个apscheduler.job.Job实例,这样就可以在运行时,修改或删除任务。
    	在任何时候你都能配置任务。但是如果调度器还没有启动,此时添加任务,那么任务就处于一个暂存的状态。只有当调度器启动时,才会开始计算下次运行时间。
    	还有一点要注意,如果你的执行器或任务储存器是会序列化任务的,那么这些任务就必须符合:
    
        回调函数必须全局可用
        回调函数参数必须也是可以被序列化的
    # 注意:
        重要提醒!
    	如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使replace_existing=True,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。
    	内置任务储存器中,只有MemoryJobStore不会序列化任务;内置执行器中,只有ProcessPoolExecutor会序列化任务。
    	建议:如果想要立刻运行任务,可以在添加任务时省略trigger参数
    
    

    8.移除任务

        如果想从调度器移除一个任务,那么你就要从相应的任务储存器中移除它,这样才算移除了。有两种方式:
        调用remove_job(),参数为:任务ID,任务储存器名称
        在通过add_job()创建的任务实例上调用remove()方法
    
        第二种方式更方便,但前提必须在创建任务实例时,实例被保存在变量中。对于通过scheduled_job()创建的任务,只能选择第一种方式。
    
        当任务调度结束时(比如,某个任务的触发器不再产生下次运行的时间),任务就会自动移除。
    
        job = scheduler.add_job(myfunc, 'interval', minutes=2)
        job.remove()
    
        同样,通过任务的具体ID:
        scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
        scheduler.remove_job('my_job_id')
    

    9.暂停和恢复

    	通过任务实例或调度器,就能暂停和恢复任务。如果一个任务被暂停了,那么该任务的下一次运行时间就会被移除。在恢复任务前,运行次数计数也不会被统计。
    
        暂停任务,有以下两个方法:
        apscheduler.job.Job.pause()
        apscheduler.schedulers.base.BaseScheduler.pause_job()
    
        恢复任务
        apscheduler.job.Job.resume()
        apscheduler.schedulers.base.BaseScheduler.resume_job()
    

    10.获取任务列表

    	通过get_jobs()就可以获得一个可修改的任务列表。get_jobs()第二个参数可以指定任务储存器名称,那么就会获得对应任务储存器的任务列表。
    	print_jobs()可以快速打印格式化的任务列表,包含触发器,下次运行时间等信息。
    
    修改任务
    	通过apscheduler.job.Job.modify()或modify_job(),你可以修改任务当中除了id的任何属性。
    
    比如:
    	job.modify(max_instances=6, name='Alternate name')
        
    	如果想要重新调度任务(就是改变触发器),你能通过apscheduler.job.Job.reschedule()或reschedule_job()来实现。这些方法会重新创建触发器,并重新计算下次运行时间。
    
    比如:
    	scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
    

    11.管理调度区

    关闭方法如下:
    scheduler.shutdown()
    
    默认情况下,调度器会先把正在执行的任务处理完,再关闭任务储存器和执行器。但是,如果你就直接关闭,你可以添加参数:
    scheduler.shutdown(wait=False)
    上述方法不管有没有任务在执行,会强制关闭调度器。
    

    12.暂停或恢复任务进程

    调度器可以暂停正在执行的任务:
    scheduler.pause()
    
    恢复任务:
    scheduler.resume()
    
    同时,也可以在调度器启动时,默认所有任务设为暂停状态。
    scheduler.start(paused=True)
    

    13.限制任务执行的实例并行数

    	默认情况下,在同一时间,一个任务只允许一个执行中的实例在运行。比如说,一个任务是每5秒执行一次,但是这个任务在第一次执行的时候花了6秒,也就是说前一次任务还没执行完,后一次任务又触发了,由于默认一次只允许一个实例执行,所以第二次就丢失了。为了杜绝这种情况,可以在添加任务时,设置max_instances参数,为指定任务设置最大实例并行数。
    	job.modify(max_instances=6, name='Alternate name')
    

    14.对视任务的执行与合并

    有时,任务会由于一些问题没有被执行。最常见的情况就是,在数据库里的任务到了该执行的时间,但调度器被关闭了,那么这个任务就成了“哑弹任务”。错过执行时间后,调度器才打开了。这时,调度器会检查每个任务的misfire_grace_time参数int值,即哑弹上限,来确定是否还执行哑弹任务(这个参数可以全局设定的或者是为每个任务单独设定)。此时,一个哑弹任务,就可能会被连续执行多次。
    
    但这就可能导致一个问题,有些哑弹任务实际上并不需要被执行多次。coalescing合并参数就能把一个多次的哑弹任务揉成一个一次的哑弹任务。也就是说,coalescing为True能把多个排队执行的同一个哑弹任务,变成一个,而不会触发哑弹事件。
    
    注!如果是由于线程池/进程池满了导致的任务延迟,执行器就会跳过执行。要避免这个问题,可以添加进程或线程数来实现或把 misfire_grace_time值调高。
    

    15.调度器时间

    调度器允许添加事件侦听器。部分事件会有特有的信息,比如当前运行次数等。add_listener(callback,mask)中,第一个参数是回调对象,mask是指定侦听事件类型,mask参数也可以是逻辑组合。回调对象会有一个参数就是触发的事件。
    
    具体可以查看文档中events模块,里面有关于事件类型以及事件参数的详细说明。
    
    def my_listener(event):
        if event.exception:
            print('The job crashed :(')
        else:
            print('The job worked :)')
    
    # 当任务执行完或任务出错时,调用my_listener
    scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    

    事件类型

    Constant Description Event class
    EVENT_SCHEDULER_STARTED 调度器被开启 SchedulerEvent
    EVENT_SCHEDULER_SHUTDOWN 调度器被关闭 SchedulerEvent
    EVENT_SCHEDULER_PAUSED 任务进程被停暂停 SchedulerEvent
    EVENT_SCHEDULER_RESUMED 任务进程被停唤醒 SchedulerEvent
    EVENT_EXECUTOR_ADDED 执行器被添加进入调度器 SchedulerEvent
    EVENT_EXECUTOR_REMOVED 执行器被移除进入调度器 SchedulerEvent
    EVENT_JOBSTORE_ADDED 作业存储被添加到调度程序 SchedulerEvent
    EVENT_JOBSTORE_REMOVED 作业存储移除调度程序 SchedulerEvent
    EVENT_ALL_JOBS_REMOVED All jobs were removed from either all job stores or one particular job store SchedulerEvent
    EVENT_JOB_ADDED A job was added to a job store JobEvent
    EVENT_JOB_REMOVED A job was removed from a job store JobEvent
    EVENT_JOB_MODIFIED A job was modified from outside the scheduler JobEvent
    EVENT_JOB_SUBMITTED A job was submitted to its executor to be run JobSubmissionEvent
    EVENT_JOB_MAX_INSTANCES A job being submitted to its executor was not accepted by the executor because the job has already reached its maximum concurrently executing instances JobSubmissionEvent
    EVENT_JOB_EXECUTED A job was executed successfully JobExecutionEvent
    EVENT_JOB_ERROR A job raised an exception during execution JobExecutionEvent
    EVENT_JOB_MISSED A job’s execution was missed JobExecutionEvent
    EVENT_ALL A catch-all mask that includes every event type N/A

    16.异常捕获

    通过logging模块,可以添加apscheduler日志至DEBUG级别,这样就能捕获异常信息。
    
    关于logging初始化的方式如下:
    
    import logging
    
    logging.basicConfig()
    logging.getLogger('apscheduler').setLevel(logging.DEBUG)
    日志会提供很多调度器的内部运行信息。
    

    2.apscheduler使用:

    1.不带参

    from apscheduler.schedulers.blocking import  BlockingScheduler
    import datetime
    def aps_test():
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
    
    scheduler = BlockingScheduler()
    scheduler.add_job(aps_test, trigger='cron', second='*/2')
    scheduler.start()
    看代码,定义一个函数,然后定义一个scheduler类型,添加一个job,然后执行,就可以了,代码是不是超级简单,而且非常清晰。看看结果吧。
    5秒整倍数,就执行这个函数,是不是超级超级简单?对了,apscheduler就是通俗易懂。
    

    2.传参

    def aps_test(str1):
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + str1)
    
    
    scheduler = BlockingScheduler()
    scheduler.add_job(aps_test, trigger='cron', second='*/2', args=["你好"])
    scheduler.start()
    好了,上面只是给大家看的小例子,我们先从头到位梳理一遍吧。apscheduler分为4个模块,分别是Triggers,Job stores,Executors,Schedulers.从上面的例子我们就可以看出来了,triggers就是触发器,上面的代码中,用了cron,其实还有其他触发器,看看它的源码解释。
    The ``trigger`` argument can either be:
              #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
                any extra keyword arguments to this method are passed on to the trigger's constructor
              #. an instance of a trigger class
    源码中解释说,有date, interval, cron可供选择,其实看字面意思也可以知道,date表示具体的一次性任务,interval表示循环任务,cron表示定时任务,好了,分别写个代码看看效果最明显。
    

    3.三种触发器

    def aps_test(str1):
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + str1)
    
    
    scheduler = BlockingScheduler()
    scheduler.add_job(aps_test, trigger='cron', second='*/2', args=["定时调度器"])
    scheduler.add_job(aps_test, trigger='date', next_run_time=datetime.datetime.now() +
                                                              datetime.timedelta(seconds=12), args=["一次性"])
    scheduler.add_job(aps_test, trigger='interval', seconds=3, args=["循环调度器"])
    scheduler.start()
    结果图如下:
    
    5 4.添加日志功能
    from apscheduler.schedulers.blocking import  BlockingScheduler
    import datetime
    import logging
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        filename='log1.txt',
                        filemode='a')
    def aps_test(str1):
        print(1/0)
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + str1)
    scheduler = BlockingScheduler()
    scheduler._logger = logging
    scheduler.add_job(aps_test, trigger='cron', second='*/2', args=["定时调度器"])
    scheduler.start()
    

    5.删除任务

    from apscheduler.schedulers.blocking import  BlockingScheduler
    import datetime
    
    def aps_test(str1):
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + str1)
    def aps_test2(str1):
        scheduler.remove_job("interval_task")
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + str1)
    scheduler = BlockingScheduler()
    scheduler.add_job(aps_test, trigger='cron', second='*/2', args=["定时调度器"])
    scheduler.add_job(aps_test2, trigger='date', next_run_time=datetime.datetime.now() +
                      datetime.timedelta(seconds=12), args=["一次性 删除任务"], id="data_task")
    scheduler.add_job(aps_test, trigger='interval', seconds=3, args=["循环调度器"], id="interval_task")
    scheduler.start()
    在运行过程中,成功删除某一个任务,其实就是为每个任务定义一个id,然后remove_job这个id
    

    6.停止与恢复任务

    from apscheduler.schedulers.blocking import  BlockingScheduler
    import datetime
    import logging
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        filename='log1.txt',
                        filemode='a')
    def aps_test(str1):
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + str1)
    
    
    def aps_pause(x):
        scheduler.pause_job('interval_task')
        print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + x)
    
    
    def aps_resume(x):
        scheduler.resume_job('interval_task')
        print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + x)
    
    
    scheduler = BlockingScheduler()
    # scheduler.add_job(aps_test, trigger='cron', second='*/2', args=["定时调度器"])
    scheduler.add_job(func=aps_pause, args=('一次性任务,停止循环任务',), next_run_time=datetime.datetime.now() +
                      datetime.timedelta(seconds=3), id='pause_task')
    scheduler.add_job(func=aps_resume, args=('一次性任务,恢复循环任务',), next_run_time=datetime.datetime.now() +
                      datetime.timedelta(seconds=6), id='resume_task')
    scheduler.add_job(aps_test, trigger='interval', seconds=1, args=["循环调度器"], id="interval_task")
    scheduler._logger = logging
    scheduler.start()
    
    输出结果:
        2020-06-02 15:09:27循环调度器
        2020-06-02 15:09:28循环调度器
        2020-06-02 15:09:29一次性任务,停止循环任务
        2020-06-02 15:09:32一次性任务,恢复循环任务
        2020-06-02 15:09:33循环调度器
        2020-06-02 15:09:34循环调度器
        2020-06-02 15:09:35循环调度器
    

    7.异常处理

    from apscheduler.schedulers.blocking import  BlockingScheduler
    import datetime
    import logging
    from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        filename='log1.txt',
                        filemode='a')
    def aps_test(str1):
        print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + str1)
    def aps_pause(x):
        print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + x)
        print(1/0)
    def my_listener(event):
        if event.exception:
            print('任务出错了!!!!!!')
        else:
            print('任务照常运行...')
    scheduler = BlockingScheduler()
    scheduler.add_job(func=aps_pause, args=('一次性任务,会报错',), next_run_time=datetime.datetime.now() +
                      datetime.timedelta(seconds=3), id='pause_task')
    scheduler.add_job(aps_test, trigger='interval', seconds=1, args=["循环调度器"], id="interval_task")
    scheduler.add_listener(my_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED)
    scheduler._logger = logging
    scheduler.start()
    
    
    总结:
        这个监听相当于一个回调函数,当一切正常照常运行,出错之后,我们可以给将消息发送给管理员
        
    输出结果:
        2020-06-02 15:25:35循环调度器
        任务照常运行...
        2020-06-02 15:25:36循环调度器
        任务照常运行...
        2020-06-02 15:25:37一次性任务,会报错
        任务出错了!!!!!!
        2020-06-02 15:25:37循环调度器
        任务照常运行...
        2020-06-02 15:25:38循环调度器
        任务照常运行...
        2020-06-02 15:25:39循环调度器
        任务照常运行...
    

    参考:

    https://www.cnblogs.com/yueerwanwan0204/p/5480870.html # 初学比较友好

    http://www.chenxm.cc/article/829.html # 内容稍微复杂一点

  • 相关阅读:
    Path Sum II
    Convert Sorted Array to Binary Search Tree
    Construct Binary Tree from Inorder and Postorder Traversal
    Construct Binary Tree from Preorder and Inorder Traversal
    Maximum Depth of Binary Tree
    Binary Tree Zigzag Level Order Traversal
    Binary Tree Level Order Traversal
    Same Tree
    Validate Binary Search Tree
    Binary Tree Inorder Traversal
  • 原文地址:https://www.cnblogs.com/liuzhanghao/p/13072867.html
Copyright © 2011-2022 走看看