celery beat是用来开启定时任务调度的,一般用法为:启动celery beat,然后启动worker,让beat去调用worker里面的任务
一般我们在代码里面通过model层的插入直接就可以新建定时任务
schedule,created = IntervalSchedule.objects.get_or_create( every = 10, period = IntervalSchedule.SECONDS ) PeriodicTask.objects.create( interval = schedule, name = random.random(), task = "adv_celery.tasks.task1.tasks.test", #添加参数 args = json.dumps(["hello "]), kwargs = json.dumps({}), #expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) )
由于我们使用的数据库插入模式,记得配置
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
然后我们在adv_celery.tasks.task1.tasks这个文件里面建立一个超级简单的test任务,对应它的路径
task = "adv_celery.tasks.task1.tasks.test",
这个任务这样就可以
@shared_task def test(arg): print("312")
然后开启celerybeat,
celery -A 你的应用 beat
再开启worker
celery -A 你的应用 worker
为了防止爆发
MySQL backend does not support timezone-ae datetimes when USE_TZ is False.
问题,在django的setting.py设置
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
在任务中获取异步任务id
@task
def do_job(path): cache.set(do_job.request.id, operation_results)
在执行异步任务时即拿到任务
task = task_addnums.delay()
task_id = task.id
根据id获取任务结果
from celery.result import AsyncResult res=AsyncResult("62051878-ca77-4895-a61f-6f9525681347") # 参数为task id res.result
启动以后
redis打印的日志
[6772] 28 Oct 09:37:08.306 * Background saving terminated with success [6772] 28 Oct 09:42:09.064 * 100 changes in 300 seconds. Saving... [6772] 28 Oct 09:42:09.065 * Background saving started by pid 12368