Celery的相关模块
-
包含异步任务和定时任务. 其中, 异步任务通常在业务逻辑中被触发并发往任务队列, 而定时任务由 Celery Beat 进程周期性地将任务发往任务队列.
-
消息中间件 Broker(中间人)
Broker, 即为任务调度队列, 接收任务生产者发来的消息(即任务), 将任务存入队列. Celery 本身不提供队列服务, 官方推荐使用 RabbitMQ 和 Redis 等.
-
任务执行单元 Worker
Worker 是执行任务的处理单元, 它实时监控消息队列, 获取队列中调度的任务, 并执行它.
-
任务结果存储 Backend
Backend 用于存储任务的执行结果, 以供查询. 同消息中间件一样, 存储也可使用 RabbitMQ, Redis 和 MongoDB 等
我在使用中,使用在Django的上传文件到oss服务器中,由于改操作不影响用户后续操作,如果让用户等带来不好的用户体验,可以用Celery进行异步操作。
Celery其实是一个独立的进程,开辟了一条独立的进程用于接受任务模块提交过来的任务。
上一个项目示意图
首相需要安装运行环境,
pip install 'celery[redis]' #此安装模式告知,redis依赖安装在celery下
开始必须运行本地redis
先上一个最简单的Celery代码。
import time from celery import Celery broker = 'redis://127.0.0.1:6379/0' backend = 'redis://127.0.0.1:6379/0' app = Celery('my_task', broker=broker, backend=backend) @app.task def add(x, y): time.sleep(5) # 模拟耗时操作 return x + y
将文件保存为task.py
在Python环境下运行命令
celery worker -A task --loglevel=info 执行Celery
task是文件的文件名,--loglevel=info为显示的日志信息。
在环境中运行add
In [24]: add Out[24]: <@task: task.add of my_task at 0x1083e7250>
add已经被装饰过,实际已经成为了一个task任务
如果直接执行add,其实跟运行普通函数没什么区别。
应该执行add.delay(2, 3),这样的话,才回执行Celery任务。
可以用变量接收ret = add.delay(2.4)
等待ret运行的任务执行出结果以后,ret.result可以返回运行结果。
Djando Celery实战,
首先新建一个独立的文件夹worker
里面新建__init__.py与config.py
在__init__文件里面实例化Celery,在config.py里面填写相关设置信息。
import os from celery import Celery from worker import config # 加载Django 的 setting os.environ.setdefault("DJANGO_SETTINGS_MODULE", "swiper.settings") celery_app = Celery('worker') # 实例话一个celery celery_app.config_from_object(config) # 加载相关配置 celery_app.autodiscover_tasks() #自动监听任务
config.py
broker_url = 'redis://127.0.0.1:6379/0' broker_pool_limit = 10 # Borker 连接池, 默认是10 timezone = 'Asia/Shanghai' accept_content = ['pickle', 'json'] task_serializer = 'pickle' result_expires = 3600 # 任务过期时间 result_backend = 'redis://127.0.0.1:6379/0' result_serializer = 'pickle' result_cache_max = 10000 # 任务结果最大缓存数量 worker_redirect_stdouts_level = 'INFO' #日志级别
在需要异步操作的函数上添加@celery_app.task
启动Celery
celery worker -A worker --loglevel=info
执行具体的接口函数:xxx.delay()
后续有待结果的疑惑,如何进行多worker设置,还有就是result的取出问题,如果异步操作直接取出将是空,很多时候该操作还没返回值,worker还在运行中。
有空研究一下,redis里面的result_backend里面的相关内容信息。