任务队列用作一种在线程或计算机之间分配工作的机制。
Celery用Python编写,但是该协议可以用任何语言实现。
只要涉及到第三方服务(发短信,发邮件)的时候,建议用异步队列来实现。
例如有一个四段服务是串行的,第一、二段和第四段是服务器本身的,而第三段是第三方任务,在程序运行时,第三方任务的耗时非常慢,串行的代码实现第三段服务了消耗的时间很长,造成用户体验差。这时候我们就可以使用异步来处理代码。异步就是把一段程序中的某个模块放到另一个进程执行的,在这里我们可以让第三段任务异步处理,让整个程序并行执行(单独把第三段拉出去执行),这样一来服务器本身只执行第一、二、四段。就算第三方任务就算执行时间很长,也不会影响整个代码流程了。
Celery及异步任务的处理:
Celery有两个功能,一个是处理异步任务,一个是处理定时任务。
Celery处理任务的流程有三部分,分别是
- 消息中间件(Broker) -- 消息中间件相当于一个队列,用来存放任务
- 任务执行单元(Celery Worker) -- 负责监听消息中间件
- 结果存储(Backend) -- 存储任务处理结果
任务模块(Task):包含异步任务和定时任务,其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由Celery Beat 进程周期性地将任务发往任务队列。
消息中间件(Broker):负责任务队列调度,接收任务生产者发来的任务,将任务存入队列,Celery本身不提供队列服务,官方推荐使用Redis和RabbitMQ等。
任务执行单元(Worker): Worker是执行任务的处理单元,它实施监控消息队列,获取队列中调度的任务,并执行它。
任务结果存储(Backend): Backend用于存储任务的执行结果,以供查询,同消息中间件一样,存储也可以使用Redis、RabbitMQ和MongoDB等。
Django中使用Celery的工作流程:
- Django把耗时任务封装成一个方法
- 然后把耗时任务丢到队列(消息中间件)里面,
- Celery Worker监听到队列中有任务,就创建Worker,有多少个耗时任务就创建多少个Worker(可以是进程或者线程)
- 工人操作完任务以后就把结果放到存储中。
Celery在Django中使用官方文档连接:https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#using-celery-with-django
配置celery:
- Django == 1.11.11
- celery == 4.4.0
新建一个worker文件,创建__init__
文件
__init__.py
> 除了目录名,其他都是固定写法
import os
from celery import Celery
from worker import config
# 加载django的环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "主目录名.settings")
# 实例化celery
celery_app = Celery('swiper(主目录名)')
# 加载配置文件
celery_app.config_from_object(config)
# 自动注册任务
celery_app.autodiscover_tasks()
worker > 新建config.py , 用来指定消息中间件使用什么队列,这里使用Redis。
broker_url = 'redis://127.0.0.1:6379/0'
使用celery:封装短信验证码到func.py文件中
在公共目录common下,新建func.py文件 >
import random
from libs.yuntongxun.sms import CCP
from worker import celery_app
from django_redis import get_redis_connection
# 把django中第三方或者耗时的方法变成celery中的任务
@celery_app.task
def send_sms(phone):
ccp = CCP()
sms_code = create_sms_code(4)
# 把验证码存储redis的缓存中
redis_cli = get_redis_connection()
redis_cli.set(f'smscode-{phone}', sms_code, 18000)
ccp.send_template_sms(phone, [sms_code, 3], 1)
def create_sms_code(num):
start = 10 ** (num - 1)
end = 10 ** num - 1
return random.randint(start, end)
把celery任务放到队列中
# 把执行发短信的任务放入队列
# delay接收的参数就是被celery封装的参数,
# data.get("phone")获取用户输入的手机号
send_sms.delay(data.get("phone"))
启动celery消息队列:(Windows对多进程不是很友好,所以在Windows中使用单进程)
参数:
- -A 指定文件名 -l 自制输出格式 --pool=solo(单继承执行,Linux系统不需要)
celery worker -A worker -l info --pool=solo
异步任务已开启!!!worker开始监听