zoukankan      html  css  js  c++  java
  • python apscheduler学习

    简单使用

    pip install apscheduler
    

    组成部分

    • 触发器(tiggers):触发器是提供scheduling的逻辑调度,并且每个job都有自己的tiggers,用于管理任务何时开始执行,除了在初始配置之外,tiggers完全是无状态的。
    • 作业存储(stores):主要包含的是scheduling的调度任务。默认情况下,stores会将任务存储在内存中,也可以将任务存储在数据库中,达到持久存储的效果,存储的时候会对数据进行序列化,被加载的时候进行反序列化。除了默认情况下,stores不会将job data存储在内存中,而是充当中间人,用于在后端保存,加载,更新和搜索任务。
    • 执行器(executors):处理job是如何执行的。通常将job指定可调用对象提交给线程池或进程池来完成操作。作业完成后,执行期会通知scheduler调度程序,调度程序随后会发出相应的事件。
    • 调度器(schedulers):将各个组件调度起来。通常应用程序只会运行一个schedulers。开发人员通常不直接处理storesexecutorstiggersschedulers提供了适当的接口来处理这些,

    简单使用

    添加任务

    from datetime import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    sched = BlockingScheduler()
    
    def job():
        print("job %s" % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    
    @sched.scheduled_job('interval', seconds=5)
    def job_2():
        print("job2 %s" % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    
    # 每5秒执行一次
    sched.add_job(job, 'interval', seconds=5)
    
    sched.start()
    

    删除任务

    # 设置id,根据id删除任务
    sched.add_job(job, "interval", minutes=1, id="job")
    sched.remove_job("job")
    # 变量删除
    add_job = sched.add_job(job, 'interval', seconds=5)
    # 任务删除
    add_job.remove()
    

    暂停和恢复

    # 暂停
    apsched.job.Job.pause()
    apsched.schedulers.base.BaseScheduler.pause_job()
    # 恢复
    apsched.job.Job.resume()
    apsched.schedulers.base.BaseScheduler.resume_job()
    

    获取job

    sched.get_job(job_id='123')
    sched.get_jobs()
    

    关闭调度器

    # 默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。
    sched.shutdown()
    
    sched.shutdown(wait=False)
    

    触发器(tiggers)

    cron

    (int|str) 表示参数既可以是int类型,也可以是str类型
    (datetime | str) 表示参数既可以是datetime类型,也可以是str类型
     
    year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
    month (int|str) – month (1-12) -(表示取值范围为1-12月)
    day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
    week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
    day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
    hour (int|str) – hour (0-23) - (表示取值范围为0-23时)
    minute (int|str) – minute (0-59) - (表示取值范围为0-59分)
    second (int|str) – second (0-59) - (表示取值范围为0-59秒)
    start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
    end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
    timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
    
    • demo
    #表示2017年3月22日17时19分07秒执行该程序
    sched.add_job(job, 'cron', year=2017,month = 3,day = 22,hour = 17,minute = 19,second = 7)
     
    #表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
    sched.add_job(job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
     
    #表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00
    sched.add_job(job, 'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2014-05-30')
     
    #表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
    sched.add_job(job, 'cron', second = '*/5')
    

    interval 间隔调度(循环执行)

    weeks (int) – number of weeks to wait
    days (int) – number of days to wait
    hours (int) – number of hours to wait
    minutes (int) – number of minutes to wait
    seconds (int) – number of seconds to wait
    start_date (datetime|str) – starting point for the interval calculation
    end_date (datetime|str) – latest possible date/time to trigger on
    timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
    
    • demo
    #表示每隔3天17时19分07秒执行一次任务
    sched.add_job(job, 'interval',days  = 3,hours = 17,minutes = 19,seconds = 7)
    

    date定时调度(仅调度一次)

    run_date (datetime|str) – the date/time to run the job at  -(任务开始的时间)
    timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
    
    • demo
    # The job will be executed on November 6th, 2009
    sched.add_job(job, 'date', run_date=date(2009, 11, 6), args=['text'])
    # The job will be executed on November 6th, 2009 at 16:30:05
    sched.add_job(job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
    

    调度器(schedulers)

    • BlockingScheduler: 如果调度器是你程序中唯一要运行的东西,请选择它
    • BackgroundScheduler: 如果你想你的调度器可以在你的应用程序后台静默运行,同时也不打算使用以下任何 Python 框架,请选择它
    • AsyncIOScheduler: 如果你的程序使用了 asyncio 库,请使用这个调度器
    • GeventScheduler: 如果你的程序使用了 gevent 库,请使用这个调度器
    • TornadoScheduler: 如果你打算构建一个 Tornado 程序,请使用这个调度器
    • TwistedScheduler: 如果你打算构建一个 Twisted 程序,请使用这个调度器
    • QtScheduler: 如果你打算构建一个 Qt 程序,请使用这个调度器

    官方实例

    1. executors/processpool.py

    from datetime import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
    HIT_NUMBER = 0
    
    executors = {
        'default': ThreadPoolExecutor(20),
        'processpool': ProcessPoolExecutor(1)
    }
    
    def hit():
        global HIT_NUMBER
        HIT_NUMBER += 1
        print(f'Hit {HIT_NUMBER}! The time is {datetime.now()} ...')
    
    
    if __name__ == '__main__':
        scheduler = BlockingScheduler()
        scheduler.add_executor('processpool')
        scheduler.configure(executors=executors)
        job = scheduler.add_job(hit, 'interval', seconds=1, id='hit')
    
        print('Press Ctrl+C to exit')
        try:
            scheduler.start()
        except (KeyboardInterrupt, SystemExit):
            scheduler.shutdown()
    

    2. jobstores/sqlalchemy.py

    import sys
    import os
    from datetime import datetime, timedelta
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    
    jobstores = {
        'sqlalchemy': SQLAlchemyJobStore(url='sqlite:///sqlalchemy_default.sqlite')
    }
    
    
    def alarm(time):
        print('Alarm! This alarm was scheduled at %s.' % time)
    
    
    if __name__ == '__main__':
        scheduler = BlockingScheduler()
        scheduler.configure(jobstores=jobstores)
        scheduler.add_executor('processpool')
        alarm_time = datetime.now() + timedelta(seconds=10)
        scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
    
        print('Press Ctrl+C to exit')
        try:
            scheduler.start()
        except (KeyboardInterrupt, SystemExit):
            scheduler.shutdown()
    

    3. misc/reference.py

    import os
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    if __name__ == '__main__':
        scheduler = BlockingScheduler()
        scheduler.add_job('sys:stdout.write', 'interval', seconds=3, args=['tick
    '])
        print(f'Press Ctrl+C to exit')
    
        try:
            scheduler.start()
        except (KeyboardInterrupt, SystemExit):
            pass
    

    4.1 rpc/server.py

    import rpyc
    from rpyc.utils.server import ThreadedServer
    
    from apscheduler.schedulers.background import BackgroundScheduler
    
    
    def print_text(text):
        print(text)
    
    
    class SchedulerService(rpyc.Service):
        def exposed_add_job(self, func, *args, **kwargs):
            return scheduler.add_job(func, *args, **kwargs)
    
        def exposed_modify_job(self, job_id, jobstore=None, **changes):
            return scheduler.modify_job(job_id, jobstore, **changes)
    
        def exposed_reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
            return scheduler.reschedule_job(job_id, jobstore, trigger, **trigger_args)
    
        def exposed_pause_job(self, job_id, jobstore=None):
            return scheduler.pause_job(job_id, jobstore)
    
        def exposed_resume_job(self, job_id, jobstore=None):
            return scheduler.resume_job(job_id, jobstore)
    
        def exposed_remove_job(self, job_id, jobstore=None):
            scheduler.remove_job(job_id, jobstore)
    
        def exposed_get_job(self, job_id):
            return scheduler.get_job(job_id)
    
        def exposed_get_jobs(self, jobstore=None):
            return scheduler.get_jobs(jobstore)
    
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        scheduler.start()
        protocol_config = {'allow_public_attrs': True}
        server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
        try:
            server.start()
        except (KeyboardInterrupt, SystemExit):
            pass
        finally:
            scheduler.shutdown()
    

    4.2 rpc/client.py

    from time import sleep
    
    import rpyc
    
    
    conn = rpyc.connect('localhost', 12345)
    job = conn.root.add_job('server:print_text', 'interval', args=['Hello, World'], seconds=2)
    sleep(10)
    conn.root.remove_job(job.id)
    

    5. schedulers/blocking.py

    from datetime import datetime
    import os
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def tick():
        print('Tick! The time is: %s' % datetime.now())
    
    
    if __name__ == '__main__':
        scheduler = BlockingScheduler()
        scheduler.add_job(tick, 'interval', seconds=3)
        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    
        try:
            scheduler.start()
        except (KeyboardInterrupt, SystemExit):
            pass
    

    6. Tornado场景

    from datetime import timedelta
    from tornado import gen, ioloop, queues
    from tornado.httpclient import AsyncHTTPClient, HTTPError, HTTPRequest
    from lxml import etree
    from urllib.parse import urljoin
    import re
    from apscheduler.schedulers.tornado import TornadoScheduler
    from collections import Iterator
    
    
    class Response:
    
        def __init__(self, response, node):
            self.response = response
            self.node = node
    
        @property
        def meta(self):
            return self.node.meta
    
        @property
        def content(self) -> bytes:
            return self.response.body
    
        @property
        def url(self):
            return self.response.effective_url
    
        @property
        def headers(self):
            return self.response.headers
    
        @property
        def reason(self):
            return self.response.reason
    
        @property
        def request(self):
            return self.response.request
    
        @property
        def request_time(self):
            return self.response.request_time
    
        @property
        def rethrow(self):
            return self.response.rethrow
    
        @property
        def time_info(self):
            return self.response.time_info
    
        @property
        def code(self):
            return self.response.code
    
        @property
        def text(self):
            return self.response.body.decode("utf-8", "ignore")
    
        def xpath(self, pattern):
            if not hasattr(self, "html"):
                self.html = etree.HTML(self.text)
            return self.html.xpath(pattern)
    
    
    class Node:
    
        def __init__(self, url, callback, meta=None):
            self.url = url
            self.callback = callback
            self.meta = meta
    
    
    class AsyCrawler:
        start_url = []
    
        def __init__(self, concurrency=3):
            self.queue = queues.Queue()
            self.concurrency = concurrency
    
        def clean_bank(self, txt):
            return re.sub("s", "", "".join(txt))
    
        def get_start_url(self):
            for start_url in self.start_url:
                yield start_url
    
        @gen.coroutine
        def fetch(self, url, retry=5, **kwargs):
            for i in range(retry):
                try:
                    request = HTTPRequest(url=url, **kwargs)
                    response = yield AsyncHTTPClient().fetch(request)
                except HTTPError as e:
                    print("http error %s with url = %s retry = %s" % (e.code, url, i + 1))
                    if e.code in (403, 404):
                        break
                    continue
                except Exception as e:
                    print("error = %s url = %s" % (e, url))
                    break
                raise gen.Return(response)
            raise gen.Return(None)
    
        def handle(self, response):
            pass
    
        def run(self):
            for start_url in self.get_start_url():
                node = Node(url=start_url, callback=self.handle)
                self.queue.put_nowait(node)
    
            @gen.coroutine
            def runner():
                for _ in range(self.concurrency):
                    worker()
                yield self.queue.join(timeout=timedelta(days=1))
    
            @gen.coroutine
            def worker():
                while True:
                    node = yield self.queue.get()
                    response = yield self.fetch(node.url)
                    if response is None:
                        self.queue.task_done()
                        continue
                    callback = node.callback(Response(response, node))
                    if isinstance(callback, Iterator):
                        for next_node in callback:
                            self.queue.put_nowait(next_node)
                            print("next node = %s" % next_node.url)
                    self.queue.task_done()
    
            ioloop.IOLoop.current().add_callback(runner)
    
    
    class TouTiaoCrawler(AsyCrawler):
        start_url = ["https://toutiao.io/prev/2019-11-14"]
    
        def handle(self, response):
            for post in response.xpath("//div[@class='post']"):
                title = post.xpath("./div[@class='content']//h3[@class='title']//a//@title")
                meta = post.xpath("./div[@class='content']/div[@class='meta']//text()")
                upvote = post.xpath("./div[contains(@class, 'upvote')]/a/span//text()")
                info_href = post.xpath("./div[@class='content']//h3[@class='title']//a//@href")[0]
                info_href = urljoin("https://toutiao.io/", info_href)
                title = self.clean_bank(title)
                meta = self.clean_bank(meta)
                upvote = self.clean_bank(upvote)
                print(title, meta, upvote, info_href)
                meta_dict = {
                    "title": title, "upvote": upvote, "meta": meta
                }
                yield Node(url=info_href, callback=self.handle_info, meta=meta_dict)
    
        def handle_info(self, response):
            title = response.xpath("//title//text()")
            title = self.clean_bank(title)
            data = response.meta
            data['url'] = response.url
            data['info_title'] = title
            print(data)
    
    
    if __name__ == '__main__':
        scheduler = TornadoScheduler()
        toutiao = TouTiaoCrawler()
        scheduler.add_job(toutiao.run, trigger="cron", second="*/10")
        scheduler.start()
        instance = ioloop.IOLoop.instance()
        instance.start()
    
    

    7.Twisted场景

    from datetime import datetime
    import os
    
    from twisted.internet import reactor
    from apscheduler.schedulers.twisted import TwistedScheduler
    
    
    def tick():
        print('Tick! The time is: %s' % datetime.now())
    
    
    if __name__ == '__main__':
        scheduler = TwistedScheduler()
        scheduler.add_job(tick, 'interval', seconds=3)
        scheduler.start()
        print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
    
        # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
        try:
            reactor.run()
        except (KeyboardInterrupt, SystemExit):
            pass
    

    常见问题

    在uWSGI中使用

    uWSGI使用了一些技巧禁用了GIL锁,但多线程的使用对于APScheduler的操作来说至关重要,使用—enable-threads选项开启GIL

  • 相关阅读:
    SAP函数生成测试数据
    ABAP——编码规范
    展BOM清单——CS_BOM_EXPL_MAT_V2
    Java使用JCO实现调用SAP接口(建立采购单)
    ECN变更单建立——CCAP_ECN_CREATE
    SmartForms——插入复选框
    SmartForms——属性框被拖拽到左边不能复原
    SmartForms——实例
    SmartForms——基础知识
    SAP PP——生产订单的状态
  • 原文地址:https://www.cnblogs.com/iFanLiwei/p/12805170.html
Copyright © 2011-2022 走看看