简单使用
pip install apscheduler
组成部分
- 触发器(tiggers):触发器是提供
scheduling
的逻辑调度,并且每个job
都有自己的tiggers
,用于管理任务何时开始执行,除了在初始配置之外,tiggers
完全是无状态的。 - 作业存储(stores):主要包含的是
scheduling
的调度任务。默认情况下,stores
会将任务存储在内存中,也可以将任务存储在数据库中,达到持久存储的效果,存储的时候会对数据进行序列化,被加载的时候进行反序列化。除了默认情况下,stores
不会将job data
存储在内存中,而是充当中间人,用于在后端保存,加载,更新和搜索任务。 - 执行器(executors):处理job是如何执行的。通常将
job
指定可调用对象提交给线程池或进程池来完成操作。作业完成后,执行期会通知scheduler
调度程序,调度程序随后会发出相应的事件。 - 调度器(schedulers):将各个组件调度起来。通常应用程序只会运行一个
schedulers
。开发人员通常不直接处理stores
、executors
、tiggers
。schedulers
提供了适当的接口来处理这些,
简单使用
添加任务
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
def job():
print("job %s" % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
@sched.scheduled_job('interval', seconds=5)
def job_2():
print("job2 %s" % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 每5秒执行一次
sched.add_job(job, 'interval', seconds=5)
sched.start()
删除任务
# 设置id,根据id删除任务
sched.add_job(job, "interval", minutes=1, id="job")
sched.remove_job("job")
# 变量删除
add_job = sched.add_job(job, 'interval', seconds=5)
# 任务删除
add_job.remove()
暂停和恢复
# 暂停
apsched.job.Job.pause()
apsched.schedulers.base.BaseScheduler.pause_job()
# 恢复
apsched.job.Job.resume()
apsched.schedulers.base.BaseScheduler.resume_job()
获取job
sched.get_job(job_id='123')
sched.get_jobs()
关闭调度器
# 默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将wait选项设置为False。
sched.shutdown()
sched.shutdown(wait=False)
触发器(tiggers)
cron
(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型
year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
month (int|str) – month (1-12) -(表示取值范围为1-12月)
day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
hour (int|str) – hour (0-23) - (表示取值范围为0-23时)
minute (int|str) – minute (0-59) - (表示取值范围为0-59分)
second (int|str) – second (0-59) - (表示取值范围为0-59秒)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示开始时间)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示结束时间)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
- demo
#表示2017年3月22日17时19分07秒执行该程序
sched.add_job(job, 'cron', year=2017,month = 3,day = 22,hour = 17,minute = 19,second = 7)
#表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
sched.add_job(job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
#表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00
sched.add_job(job, 'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2014-05-30')
#表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
sched.add_job(job, 'cron', second = '*/5')
interval 间隔调度(循环执行)
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
- demo
#表示每隔3天17时19分07秒执行一次任务
sched.add_job(job, 'interval',days = 3,hours = 17,minutes = 19,seconds = 7)
date定时调度(仅调度一次)
run_date (datetime|str) – the date/time to run the job at -(任务开始的时间)
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
- demo
# The job will be executed on November 6th, 2009
sched.add_job(job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
调度器(schedulers)
BlockingScheduler
: 如果调度器是你程序中唯一要运行的东西,请选择它BackgroundScheduler
: 如果你想你的调度器可以在你的应用程序后台静默运行,同时也不打算使用以下任何 Python 框架,请选择它AsyncIOScheduler
: 如果你的程序使用了asyncio
库,请使用这个调度器GeventScheduler
: 如果你的程序使用了gevent
库,请使用这个调度器TornadoScheduler
: 如果你打算构建一个Tornado
程序,请使用这个调度器TwistedScheduler
: 如果你打算构建一个Twisted
程序,请使用这个调度器QtScheduler
: 如果你打算构建一个Qt
程序,请使用这个调度器
官方实例
1. executors/processpool.py
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
HIT_NUMBER = 0
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(1)
}
def hit():
global HIT_NUMBER
HIT_NUMBER += 1
print(f'Hit {HIT_NUMBER}! The time is {datetime.now()} ...')
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.configure(executors=executors)
job = scheduler.add_job(hit, 'interval', seconds=1, id='hit')
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
2. jobstores/sqlalchemy.py
import sys
import os
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'sqlalchemy': SQLAlchemyJobStore(url='sqlite:///sqlalchemy_default.sqlite')
}
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.configure(jobstores=jobstores)
scheduler.add_executor('processpool')
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
3. misc/reference.py
import os
from apscheduler.schedulers.blocking import BlockingScheduler
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job('sys:stdout.write', 'interval', seconds=3, args=['tick
'])
print(f'Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
4.1 rpc/server.py
import rpyc
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler
def print_text(text):
print(text)
class SchedulerService(rpyc.Service):
def exposed_add_job(self, func, *args, **kwargs):
return scheduler.add_job(func, *args, **kwargs)
def exposed_modify_job(self, job_id, jobstore=None, **changes):
return scheduler.modify_job(job_id, jobstore, **changes)
def exposed_reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
return scheduler.reschedule_job(job_id, jobstore, trigger, **trigger_args)
def exposed_pause_job(self, job_id, jobstore=None):
return scheduler.pause_job(job_id, jobstore)
def exposed_resume_job(self, job_id, jobstore=None):
return scheduler.resume_job(job_id, jobstore)
def exposed_remove_job(self, job_id, jobstore=None):
scheduler.remove_job(job_id, jobstore)
def exposed_get_job(self, job_id):
return scheduler.get_job(job_id)
def exposed_get_jobs(self, jobstore=None):
return scheduler.get_jobs(jobstore)
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.start()
protocol_config = {'allow_public_attrs': True}
server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()
4.2 rpc/client.py
from time import sleep
import rpyc
conn = rpyc.connect('localhost', 12345)
job = conn.root.add_job('server:print_text', 'interval', args=['Hello, World'], seconds=2)
sleep(10)
conn.root.remove_job(job.id)
5. schedulers/blocking.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
6. Tornado场景
from datetime import timedelta
from tornado import gen, ioloop, queues
from tornado.httpclient import AsyncHTTPClient, HTTPError, HTTPRequest
from lxml import etree
from urllib.parse import urljoin
import re
from apscheduler.schedulers.tornado import TornadoScheduler
from collections import Iterator
class Response:
def __init__(self, response, node):
self.response = response
self.node = node
@property
def meta(self):
return self.node.meta
@property
def content(self) -> bytes:
return self.response.body
@property
def url(self):
return self.response.effective_url
@property
def headers(self):
return self.response.headers
@property
def reason(self):
return self.response.reason
@property
def request(self):
return self.response.request
@property
def request_time(self):
return self.response.request_time
@property
def rethrow(self):
return self.response.rethrow
@property
def time_info(self):
return self.response.time_info
@property
def code(self):
return self.response.code
@property
def text(self):
return self.response.body.decode("utf-8", "ignore")
def xpath(self, pattern):
if not hasattr(self, "html"):
self.html = etree.HTML(self.text)
return self.html.xpath(pattern)
class Node:
def __init__(self, url, callback, meta=None):
self.url = url
self.callback = callback
self.meta = meta
class AsyCrawler:
start_url = []
def __init__(self, concurrency=3):
self.queue = queues.Queue()
self.concurrency = concurrency
def clean_bank(self, txt):
return re.sub("s", "", "".join(txt))
def get_start_url(self):
for start_url in self.start_url:
yield start_url
@gen.coroutine
def fetch(self, url, retry=5, **kwargs):
for i in range(retry):
try:
request = HTTPRequest(url=url, **kwargs)
response = yield AsyncHTTPClient().fetch(request)
except HTTPError as e:
print("http error %s with url = %s retry = %s" % (e.code, url, i + 1))
if e.code in (403, 404):
break
continue
except Exception as e:
print("error = %s url = %s" % (e, url))
break
raise gen.Return(response)
raise gen.Return(None)
def handle(self, response):
pass
def run(self):
for start_url in self.get_start_url():
node = Node(url=start_url, callback=self.handle)
self.queue.put_nowait(node)
@gen.coroutine
def runner():
for _ in range(self.concurrency):
worker()
yield self.queue.join(timeout=timedelta(days=1))
@gen.coroutine
def worker():
while True:
node = yield self.queue.get()
response = yield self.fetch(node.url)
if response is None:
self.queue.task_done()
continue
callback = node.callback(Response(response, node))
if isinstance(callback, Iterator):
for next_node in callback:
self.queue.put_nowait(next_node)
print("next node = %s" % next_node.url)
self.queue.task_done()
ioloop.IOLoop.current().add_callback(runner)
class TouTiaoCrawler(AsyCrawler):
start_url = ["https://toutiao.io/prev/2019-11-14"]
def handle(self, response):
for post in response.xpath("//div[@class='post']"):
title = post.xpath("./div[@class='content']//h3[@class='title']//a//@title")
meta = post.xpath("./div[@class='content']/div[@class='meta']//text()")
upvote = post.xpath("./div[contains(@class, 'upvote')]/a/span//text()")
info_href = post.xpath("./div[@class='content']//h3[@class='title']//a//@href")[0]
info_href = urljoin("https://toutiao.io/", info_href)
title = self.clean_bank(title)
meta = self.clean_bank(meta)
upvote = self.clean_bank(upvote)
print(title, meta, upvote, info_href)
meta_dict = {
"title": title, "upvote": upvote, "meta": meta
}
yield Node(url=info_href, callback=self.handle_info, meta=meta_dict)
def handle_info(self, response):
title = response.xpath("//title//text()")
title = self.clean_bank(title)
data = response.meta
data['url'] = response.url
data['info_title'] = title
print(data)
if __name__ == '__main__':
scheduler = TornadoScheduler()
toutiao = TouTiaoCrawler()
scheduler.add_job(toutiao.run, trigger="cron", second="*/10")
scheduler.start()
instance = ioloop.IOLoop.instance()
instance.start()
7.Twisted场景
from datetime import datetime
import os
from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = TwistedScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
reactor.run()
except (KeyboardInterrupt, SystemExit):
pass
常见问题
在uWSGI中使用
uWSGI使用了一些技巧禁用了GIL锁,但多线程的使用对于APScheduler的操作来说至关重要,使用—enable-threads
选项开启GIL