Celery简介
celery是一个专注于实时处理和任务调度的分布式任务队列。主要用来异步处理一些发送邮件或者短信之类的耗时操作.
工作流程为任务的生产者(producer)
产生任务, 把任务放入中间人(broker)
的队列中, 然后任务消费者(worker)
去broker
队列中获取任务, 并执行该任务, 若该任务有返回值, 则可以把返回值放入存储(backend)
中, 后续生产者
再去backend
提取结果.
在Flask中使用Celery
创建celery对象
from flask import Flask
from celery import Celery
# 创建flask的app对象
app = Flask(__name__)
# 可以将celery相关配置项配置到app的配置中
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
# 创建celery对象, 必须指定broker, 不然worker启动时会报错
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# 让celery对象使用app中的配置
celery.conf.update(app.config)
定义任务
# 定义的任务需要使用celery.task装饰器装饰
@celery.task
def my_task(arg1, arg2):
return arg1+arg2
调用任务
# 在flask后台逻辑中调用定义的任务, 注意要使用delay方法才能进行异步处理, 否则只会同步处理
task = my_task.delay(10, 20)
启动worker
# tasks为定义任务的文件路径
celery -A tasks worker -l=info
在本项目中使用celery发送短信
在flask配置文件中添加celery配置
# config.py
class BasicConfig:
......
# 远程服务器
REMOTE_SERVER = 'alex-gcx.com'
# redis中定义celery使用的2号库
REDIS_CELERY_DB = 2 # celery的broker/backend
# celery配置
CELERY_BROKER_URL = f'redis://{REMOTE_SERVER}:6379/{REDIS_CELERY_DB}'
CELERY_RESULT_BACKEND = CELERY_BROKER_URL # backend和broker使用同一个redis库
创建celery对象
一开始我是在flask app的应用工厂程序create_app
中创建的celery, 就像之前的redis_connect
一样
# ihome/api_1_0/__init__.py
celery = None # 初始化为None
# 创建应用工厂
def create_app():
......
app = Flask(__name__) # 创建flask应用
......
# 重新设置celery
global celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
定义任务
在ihome/api_1_0
下创建任务文件tasks.py
, 添加发送短信的任务代码
from ihome import celery
from ronglian_sms_sdk import SmsSDK
from ihome.utils import constants
from ihome.utils.response_codes import RET
from flask import current_app
import json
@celery.task
def send_sms_code(sms_code, phone):
"""发送短信任务"""
result = {'errno': RET.OK}
try:
sdk = SmsSDK(constants.ACCID, constants.ACCTOKEN, constants.APPID)
tid = '1'
mobile = phone
# 短信模板为:....您的验证码是{1},请于{2}分钟内正确输入
# data参数为元组,第一个值为模板中的{1},第二个值为模板中的{2}
data = (sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60)
# 发送短信,接受返回值
sms_resp_json = sdk.sendMessage(tid, mobile, data)
except Exception as e:
current_app.logger.error(e)
result['errno'] = RET.THIRDERR
result['errmsg'] = '发送短信异常'
else:
# 处理返回值
sms_resp_dict = json.loads(sms_resp_json)
sms_status = sms_resp_dict.get('statusCode')
if sms_status not in ('000000', '112310'):
# 发送失败
result['errno'] = RET.THIRDERR
result['errmsg'] = sms_resp_dict.get('statusMsg')
return result
调用任务
在ihome/api_1_0/verify_codes.py
的短信接口中调用发送短信的任务
from . import tasks
@api.route("/sms_codes/<re(r'1[^120]d{9}'):phone>")
def send_sms_code(phone):
.......
tasks.send_sms_code.delay(sms_code, phone)
......
启动worker
在终端中进入ihome项目的根目录, 执行启动命令
(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info
Error:
Unable to load celery application.
'nonetype' object has no attribute 'task'
解决celery创建对象的错误
-
报错说
nonetype
, 原来是启动worker时, 确实导入了celery对象, 但是这个对象只做了初始化为None的这一步, 并没有走应用工厂create_app
中的重设celery的方法. -
因为worker的启动命令是独立于flask应用启动的, 尽管那边已经把整个flask项目启动起来了, 但是worker的启动和flask的启动是没有关系的. 所以根据worker的启动命令, 在
tasks.py
文件中从ihome
包导入了celeryfrom ihome import celery
, 因此程序会执行ihome
的__init__.py
文件, 因此会执行celery = None
, 但是create_app
方法内部的代码并不会执行, 只是定义了一个create_app
方法而已, 所以最终celery
的结果就为None
-
但是因为这里需要让celery使用app对象的配置, 所以得在app的创建后才能创建celery对象, 于是把创建celery对象的语句移到了
manager.py
中创建app的语句app = create_app('dev')
后面# manager.py app = create_app('dev') # 创建celery对象 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config)
定义任务的
tasks.py
文件中导入celery对象语句也需要改成from manager import celery
-
再次运行worker启动命令, 发现可以正常启动
(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info
-------------- celery@alex v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.3.0-3-amd64-x86_64-with-Deepin-20-apricot 2020-08-26 12:28:44
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: ihome:0x7f281f2045f8
- ** ---------- .> transport: redis://alex-gcx.com:6379/2
- ** ---------- .> results: redis://alex-gcx.com:6379/2
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. ihome.api_1_0.tasks.send_sms_code
[2020-08-26 12:28:44,525: INFO/MainProcess] Connected to redis://alex-gcx.com:6379/2
[2020-08-26 12:28:44,617: INFO/MainProcess] mingle: searching for neighbors
[2020-08-26 12:28:45,788: INFO/MainProcess] mingle: all alone
[2020-08-26 12:28:46,034: INFO/MainProcess] celery@alex ready.
解决包循环导入的错误
但是在刷新flask应用的网页时, 又报错了:
ImportError: cannot import name 'celery' from 'manager' (/home/alex/python/FlaskIhome/manager.py)
一般报错说不能导入某个模块cannot import name xxxx
而不是找不到某个模块No module named xxxx
, 那就不是模块没有安装的问题, 而是循环导入的问题, 即导包时发生了死锁. 理一下导包的过程就能发现问题了.
- worker的启动文件
tasks.py
头部导入了manager
模块:from manager import celery
- 在
manager
模块导入并执行了ihome
模块的create_app
方法 - 在
create_app
方法中导入了蓝图api_1_0
:from ihome.api_1_0 import api
- 在蓝图
api_1_0
中导入了发送短信的视图文件from . import verify_codes
- 在视图文件
verify_codes
头部又导入了celery任务文件tasks:from . import tasks
所以解决办法是延迟导入: 把verify_codes.py
中导入任务from . import tasks
从头部移到send_sms_code
方法中调用任务tasks.send_sms_code.delay(sms_code, phone)
的上一行, 这样导入时就不会执行导入tasks
这句话了, 只有在运行调用任务的代码时才会进行导入tasks
, 即什么时候用再什么时候导入
@api.route("/sms_codes/<re(r'1[^120]d{9}'):phone>")
def send_sms_code(phone):
......
# 调用异步发送短信的任务
from . import tasks
result = tasks.send_sms_code.delay(sms_code, phone)
......
再次启动worker和flask应用, 点击发送短信验证码后, 就可以正常发送短信了
(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info
-------------- celery@alex v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.3.0-3-amd64-x86_64-with-Deepin-20-apricot 2020-08-26 13:12:13
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: ihome:0x7fa4b1b1c320
- ** ---------- .> transport: redis://alex-gcx.com:6379/2
- ** ---------- .> results: redis://alex-gcx.com:6379/2
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. ihome.api_1_0.tasks.send_sms_code
[2020-08-26 13:12:13,689: INFO/MainProcess] Connected to redis://alex-gcx.com:6379/2
[2020-08-26 13:12:13,732: INFO/MainProcess] mingle: searching for neighbors
[2020-08-26 13:12:14,876: INFO/MainProcess] mingle: all alone
[2020-08-26 13:12:15,149: INFO/MainProcess] celery@alex ready.
[2020-08-26 13:12:33,465: INFO/MainProcess] Received task: ihome.api_1_0.tasks.send_sms_code[4ce67842-48d7-4dd9-8af8-1704182f685c]
.........
优化celery的目录结构-模块化
上面的celery已经可以正常工作了, 但是我们将celery对象定义在了manager.py
文件中, 而当初我们规定manager.py
只是作为启动文件用的, 不应该定义项目业务相关的细节, 所以定义在这里不太合适.
并且我们只定义了一个tasks.py
文件, 如果项目比较大定义的任务比较多的话, 那都挤在一个任务文件中显然也是不合理的. 因此我们可以将celery单独定义成一个模块化的功能.
创建celery_tasks模块
在ihome
下创建一个新的python包, 名为celery_tasks
, 用来标识celery的任务模块, 在该目录下新建一个main.py
文件, 用来创建celery对象和管理任务以及worker的启动
# ihome/celery_tasks/main.py
from celery import Celery
from manager import app as flask_app
# 创建celery对象
celery = Celery(flask_app.name, broker=flask_app.config['CELERY_BROKER_URL'])
# 添加celery配置
celery.conf.update(flask_app.config)
注:
-
还是celery的配置项还是关联了flask的app对象, 当然也可以不必关联app对象, 直接另外定义一个celery的配置文件也是可以的
-
这里导入flask的app时, 不能使用flask的
current_app
, 因为前面说过了worker启动和flask启动没有关系, 因此启动worker时读取不到current_app
-
使用的是从
manager
中导入的app对象, 但是注意导入app后, 需要将app重命名一下, 如重命名为flask_app
, 因为celery的对象默认的名字也叫app, 这样celery读取的时候会报错:'flask' object has no attribute 'user_options'
(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.celery_tasks.main worker -l=info Error: Unable to load celery application. 'flask' object has no attribute 'user_options'
至于之前在
manager.py
中创建celery对象时也是使用的app并没有报错, 我猜想可能是因为manager.py
中的app是新创建的变量, 而不是从某个地方import
导入的, 而这里会报错是因为app是从外面导入过来的, 所以报错了吧.
创建发送短信任务模块
在ihome/celery_tasks
目录下再创建一个python包, 命名为send_sms_code
, 用来标识发送短信的任务, 再在其目录下创建tasks.py
文件, 里面编写具体的任务函数
from ihome.celery_tasks.main import celery
from ronglian_sms_sdk import SmsSDK
from ihome.utils import constants
from ihome.utils.response_codes import RET
from flask import current_app
import json
@celery.task
def send_sms_code(sms_code, phone):
"""发送短信任务"""
result = {'errno': RET.OK}
try:
sdk = SmsSDK(constants.ACCID, constants.ACCTOKEN, constants.APPID)
tid = '1'
mobile = phone
# 短信模板为:....您的验证码是{1},请于{2}分钟内正确输入
# data参数为元组,第一个值为模板中的{1},第二个值为模板中的{2}
data = (sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60)
# 发送短信,接受返回值
sms_resp_json = sdk.sendMessage(tid, mobile, data)
except Exception as e:
current_app.logger.error(e)
result['errno'] = RET.THIRDERR
result['errmsg'] = '发送短信异常'
else:
# 处理返回值
sms_resp_dict = json.loads(sms_resp_json)
sms_status = sms_resp_dict.get('statusCode')
if sms_status not in ('000000', '112310'):
# 发送失败
result['errno'] = RET.THIRDERR
result['errmsg'] = sms_resp_dict.get('statusMsg')
return result
注:
-
把之前创建的
ihome/tasks.py
移过来就好了, 只是导入celery
的路径改成从main
中导入. -
如果还有其他任务, 比如发送邮件等, 就可以同样在
ihome/celery_tasks
目录下创建send_mail
python包, 并创建它下面的tasks.py
任务文件即可. -
注意把之前调用任务的业务代码
verify_codes.py
的导入任务语句路径改成:from ihome.celery_tasks.send_sms_code import tasks
将短信任务添加到main中管理
回到ihome/celery_tasks/main.py
中, 追加一句话, 可以将celery_tasks
下定义的具体任务模块添加到main中
# 自动搜索任务
celery.autodiscover_tasks(['ihome.celery_tasks.send_sms_code'])
注:
- 添加的是一个列表, 也就是说可以把多个任务都添加到列表中
- 列表中的名字是项目根目录的绝对路径, 只需要写到具体模块名(
send_sms_code
)就可以了, 前提是模块名下的任务文件必须名为tasks.py
, 否则需要写到具体的任务文件, 如'ihome.celery_tasks.send_sms_code.my_tasks'
最终celery文件结构
ihome/celery_tasks/
├── __init__.py
├── main.py
└── send_sms_code
├── __init__.py
└── tasks.py
再次启动flask应用和worker测试是否可以正常使用.注意启动脚本到是main.py
(flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.celery_tasks.main worker -l=info