1 基本概念
使用 Celery 实现定时任务的步骤:
(1) 创建一个 Celery 实例
(2) 配置文件中配置任务 ,发布任务 celery A xxx beat
(3) 启动 Celery Worker
(4) 存储结果
使用 Celery 实现异步任务的步骤:
(1) 创建一个 Celery 实例
(2) 启动 Celery Worker ,通过delay() 或 apply_async()(delay 方法封装了 apply_async, apply_async支持更多的参数 ) 将任务发布到broker
(3) 应用程序调用异步任务
(4)存储结果 (发布的任务需要return才会有结果,否则为空)
2 celery定时任务简单使用
目录结构如下
celeryconfig.py
from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行 from celery.schedules import crontab from datetime import timedelta broker_url = "redis://127.0.0.1:6379/2" # 使用redis存储任务队列 result_backend = "redis://127.0.0.1:6379/6" # 使用redis存储结果 task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = "Asia/Shanghai" # 时区设置 worker_hijack_root_logger = False # celery默认开启自己的日志,可关闭自定义日志,不关闭自定义日志输出为空 result_expires = 60 * 60 * 24 # 存储结果过期时间(默认1天) # 导入任务所在文件 imports = [ "celery_task.epp_scripts.test1", # 导入py文件 "celery_task.epp_scripts.test2", ] # 需要执行任务的配置 beat_schedule = { "test1": { "task": "celery_task.epp_scripts.test1.celery_run", #执行的函数 "schedule": crontab(minute="*/1"), # every minute 每分钟执行 "args": () # # 任务函数参数 }, "test3": { "task": "celery_task.epp_scripts.test1.celery_run", # 执行的函数 "schedule": timedelta(seconds=10), # 每10s执行一次 "args": () # # 任务函数参数 }, "test2": { "task": "celery_task.epp_scripts.test2.celery_run", "schedule": crontab(minute=0, hour="*/1"), # every minute 每小时执行 "args": () }, } # "schedule": crontab()与crontab的语法基本一致 # "schedule": crontab(minute="*/10", # 每十分钟执行 # "schedule": crontab(minute="*/1"), # 每分钟执行 # "schedule": crontab(minute=0, hour="*/1"), # 每小时执行
celery.py
# coding:utf-8 from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行 from celery import Celery # 创建celery应用对象 app = Celery("celery_demo") # 导入celery的配置信息 app.config_from_object("celery_task.celeryconfig")
tes1.py
# coding:utf-8 from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行 from celery import Celery # 创建celery应用对象 app = Celery("celery_demo") # 导入celery的配置信息 app.config_from_object("celery_task.celeryconfig")
test2.py
from celery_task.celery import app def test33(): print("test33----------------") # print("------"*50) def test44(): print("test44--------------") # print("------" * 50) test33() @app.task def celery_run(): test33() test44()
3 启动
先在一个终端: celery -A celery_task beat 发布任务,在celery_task 同级目录下执行
再在另一个终端:celery -A pinduoduo worker -l info -P eventlet 开启worker 在celery_task 同级目录下执行
需要注意,上面的参数 -P eventlet 主要是为了解决在win10 下报 Celery ValueError: not enough values to unpack (expected 3, got 0) 的错误,需要安装eventlet先
pip install eventlet -i https://pypi.douban.com/simple/
参考:https://blog.csdn.net/Shyllin/article/details/80940643
参考:https://blog.csdn.net/qq_30242609/article/details/79047660