官方文档
生产者消费者设计模式
生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行,
- Celery介绍
- 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
- 单个 Celery 进程每分钟可处理数以百万计的任务。
- 通过消息进行通信,使用消息队列( broker )在客户端和消费者之间进行协调。
创建 Celery 实例并加载配置
1.定义 Celery 包
- 包里有两个文件main.py和config.py
2.main.py
# 导入 Celery 类
from celery import Celery
# 创建 celery 实例
# 需要添加一个参数,是个字符串, 内容随意添加
celery_app = Celery('meiduo')
# 给 celery 添加配置
# 里面的参数为我们创建的 config 配置文件:
celery_app.config_from_object('celery_tasks.config')
3.config.py
# 添加消息队列的地址
broker_url = 'redis://127.0.0.1:6379/3'
4.定义任务
- 建个包,再新建task.py
# bind:保证task对象会作为第一个参数自动传入
# name:异步任务别名
# retry_backoff:异常自动重试的时间间隔 第n次(retry_backoff×2^(n-1))s
# max_retries:异常自动重试次数的上限
@celery_app.task(bind=True, name='ccp_send_sms_code', retry_backoff=3)
def ccp_send_sms_code(self, mobile, sms_code):
"""
发送短信异步任务
:param mobile: 手机号
:param sms_code: 短信验证码
:return: 成功0 或 失败-1
"""
try:
# 调用 CCP() 发送短信, 并传递相关参数:
result = CCP().send_template_sms(mobile,
[sms_code, 5],
1)
except Exception as e:
# 如果发送过程出错, 打印错误日志
logger.error(e)
# 有异常自动重试三次
raise self.retry(exc=e, max_retries=3)
# 如果发送成功, rend_ret 为 0:
if result != 0:
# 有异常自动重试三次
raise self.retry(exc=Exception('发送短信失败'), max_retries=3)
return result
5.启动Celery服务
- 在能看到Celery包的路径下,调用下面的命令行:
·celery -A celery_tasks.main worker -l info·
6.调用发送短信的任务
# Celery 异步发送短信验证码
ccp_send_sms_code.delay(mobile, sms_code)
更改celery工作模式
- 默认是进程的方式,一个cpu默认开四个进程
- 改为协程
pip install eventlet