zoukankan      html  css  js  c++  java
  • Flask-爱家租房项目ihome-06-celery发送短信

    Celery简介

    celery是一个专注于实时处理和任务调度的分布式任务队列。主要用来异步处理一些发送邮件或者短信之类的耗时操作.

    工作流程为任务的生产者(producer)产生任务, 把任务放入中间人(broker)的队列中, 然后任务消费者(worker)broker队列中获取任务, 并执行该任务, 若该任务有返回值, 则可以把返回值放入存储(backend)中, 后续生产者再去backend提取结果.

    image-20200826105229560

    在Flask中使用Celery

    创建celery对象

    from flask import Flask
    from celery import Celery
    # 创建flask的app对象
    app = Flask(__name__)
    # 可以将celery相关配置项配置到app的配置中
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
    # 创建celery对象, 必须指定broker, 不然worker启动时会报错
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    # 让celery对象使用app中的配置
    celery.conf.update(app.config)
    

    定义任务

    # 定义的任务需要使用celery.task装饰器装饰
    @celery.task
    def my_task(arg1, arg2):
        return arg1+arg2
    

    调用任务

    # 在flask后台逻辑中调用定义的任务, 注意要使用delay方法才能进行异步处理, 否则只会同步处理
    task = my_task.delay(10, 20)
    

    启动worker

    # tasks为定义任务的文件路径
    celery -A tasks worker -l=info
    

    在本项目中使用celery发送短信

    在flask配置文件中添加celery配置

    # config.py
    class BasicConfig:
        ......
        # 远程服务器
        REMOTE_SERVER = 'alex-gcx.com'
        # redis中定义celery使用的2号库
        REDIS_CELERY_DB = 2  # celery的broker/backend
    	# celery配置
        CELERY_BROKER_URL = f'redis://{REMOTE_SERVER}:6379/{REDIS_CELERY_DB}'
        CELERY_RESULT_BACKEND = CELERY_BROKER_URL  # backend和broker使用同一个redis库
    

    创建celery对象

    一开始我是在flask app的应用工厂程序create_app中创建的celery, 就像之前的redis_connect一样

    # ihome/api_1_0/__init__.py
    celery = None  # 初始化为None
    # 创建应用工厂
    def create_app():
        ......
        app = Flask(__name__)  # 创建flask应用
        ......
        # 重新设置celery
        global celery
        celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    

    定义任务

    ihome/api_1_0下创建任务文件tasks.py, 添加发送短信的任务代码

    from ihome import celery
    from ronglian_sms_sdk import SmsSDK
    from ihome.utils import constants
    from ihome.utils.response_codes import RET
    from flask import current_app
    import json
    
    @celery.task
    def send_sms_code(sms_code, phone):
        """发送短信任务"""
        result = {'errno': RET.OK}
        try:
            sdk = SmsSDK(constants.ACCID, constants.ACCTOKEN, constants.APPID)
            tid = '1'
            mobile = phone
            # 短信模板为:....您的验证码是{1},请于{2}分钟内正确输入
            # data参数为元组,第一个值为模板中的{1},第二个值为模板中的{2}
            data = (sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60)
            # 发送短信,接受返回值
            sms_resp_json = sdk.sendMessage(tid, mobile, data)
        except Exception as e:
            current_app.logger.error(e)
            result['errno'] = RET.THIRDERR
            result['errmsg'] = '发送短信异常'
        else:
            # 处理返回值
            sms_resp_dict = json.loads(sms_resp_json)
            sms_status = sms_resp_dict.get('statusCode')
            if sms_status not in ('000000', '112310'):
                # 发送失败
                result['errno'] = RET.THIRDERR
                result['errmsg'] = sms_resp_dict.get('statusMsg')
        return result
    

    调用任务

    ihome/api_1_0/verify_codes.py的短信接口中调用发送短信的任务

    from . import tasks
    
    @api.route("/sms_codes/<re(r'1[^120]d{9}'):phone>")
    def send_sms_code(phone):
    	.......
        tasks.send_sms_code.delay(sms_code, phone)
        ......
    

    启动worker

    在终端中进入ihome项目的根目录, 执行启动命令

    (flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info
    Error: 
    Unable to load celery application.
    'nonetype' object has no attribute 'task'
    

    解决celery创建对象的错误

    • 报错说nonetype, 原来是启动worker时, 确实导入了celery对象, 但是这个对象只做了初始化为None的这一步, 并没有走应用工厂create_app中的重设celery的方法.

    • 因为worker的启动命令是独立于flask应用启动的, 尽管那边已经把整个flask项目启动起来了, 但是worker的启动和flask的启动是没有关系的. 所以根据worker的启动命令, 在tasks.py文件中从ihome包导入了celeryfrom ihome import celery, 因此程序会执行ihome__init__.py文件, 因此会执行celery = None, 但是create_app方法内部的代码并不会执行, 只是定义了一个create_app方法而已, 所以最终celery的结果就为None

    • 但是因为这里需要让celery使用app对象的配置, 所以得在app的创建后才能创建celery对象, 于是把创建celery对象的语句移到了manager.py中创建app的语句app = create_app('dev')后面

      # manager.py
      app = create_app('dev')
      # 创建celery对象
      celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
      celery.conf.update(app.config)
      

      定义任务的tasks.py文件中导入celery对象语句也需要改成from manager import celery

    • 再次运行worker启动命令, 发现可以正常启动

    (flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info
     
     -------------- celery@alex v4.4.7 (cliffs)
    --- ***** ----- 
    -- ******* ---- Linux-5.3.0-3-amd64-x86_64-with-Deepin-20-apricot 2020-08-26 12:28:44
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         ihome:0x7f281f2045f8
    - ** ---------- .> transport:   redis://alex-gcx.com:6379/2
    - ** ---------- .> results:     redis://alex-gcx.com:6379/2
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    [tasks]
      . ihome.api_1_0.tasks.send_sms_code
    
    [2020-08-26 12:28:44,525: INFO/MainProcess] Connected to redis://alex-gcx.com:6379/2                                                                                   
    [2020-08-26 12:28:44,617: INFO/MainProcess] mingle: searching for neighbors
    [2020-08-26 12:28:45,788: INFO/MainProcess] mingle: all alone
    [2020-08-26 12:28:46,034: INFO/MainProcess] celery@alex ready.
    

    解决包循环导入的错误

    但是在刷新flask应用的网页时, 又报错了:

    ImportError: cannot import name 'celery' from 'manager' (/home/alex/python/FlaskIhome/manager.py)
    

    一般报错说不能导入某个模块cannot import name xxxx而不是找不到某个模块No module named xxxx, 那就不是模块没有安装的问题, 而是循环导入的问题, 即导包时发生了死锁. 理一下导包的过程就能发现问题了.

    • worker的启动文件tasks.py头部导入了manager模块: from manager import celery
    • manager模块导入并执行了ihome模块的create_app方法
    • create_app方法中导入了蓝图api_1_0: from ihome.api_1_0 import api
    • 在蓝图api_1_0中导入了发送短信的视图文件from . import verify_codes
    • 在视图文件verify_codes头部又导入了celery任务文件tasks: from . import tasks

    image-20200826130459411

    所以解决办法是延迟导入: 把verify_codes.py中导入任务from . import tasks从头部移到send_sms_code方法中调用任务tasks.send_sms_code.delay(sms_code, phone)的上一行, 这样导入时就不会执行导入tasks这句话了, 只有在运行调用任务的代码时才会进行导入tasks, 即什么时候用再什么时候导入

    @api.route("/sms_codes/<re(r'1[^120]d{9}'):phone>")
    def send_sms_code(phone):
        ......
        # 调用异步发送短信的任务
        from . import tasks
        result = tasks.send_sms_code.delay(sms_code, phone)
        ......
    

    再次启动worker和flask应用, 点击发送短信验证码后, 就可以正常发送短信了

    (flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.api_1_0.tasks worker -l=info                                               
     -------------- celery@alex v4.4.7 (cliffs)
    --- ***** ----- 
    -- ******* ---- Linux-5.3.0-3-amd64-x86_64-with-Deepin-20-apricot 2020-08-26 13:12:13
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         ihome:0x7fa4b1b1c320
    - ** ---------- .> transport:   redis://alex-gcx.com:6379/2
    - ** ---------- .> results:     redis://alex-gcx.com:6379/2
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    
    [tasks]
      . ihome.api_1_0.tasks.send_sms_code
    
    [2020-08-26 13:12:13,689: INFO/MainProcess] Connected to redis://alex-gcx.com:6379/2
    [2020-08-26 13:12:13,732: INFO/MainProcess] mingle: searching for neighbors
    [2020-08-26 13:12:14,876: INFO/MainProcess] mingle: all alone
    [2020-08-26 13:12:15,149: INFO/MainProcess] celery@alex ready.
    [2020-08-26 13:12:33,465: INFO/MainProcess] Received task: ihome.api_1_0.tasks.send_sms_code[4ce67842-48d7-4dd9-8af8-1704182f685c]                                     
    .........
    

    优化celery的目录结构-模块化

    上面的celery已经可以正常工作了, 但是我们将celery对象定义在了manager.py文件中, 而当初我们规定manager.py只是作为启动文件用的, 不应该定义项目业务相关的细节, 所以定义在这里不太合适.

    并且我们只定义了一个tasks.py文件, 如果项目比较大定义的任务比较多的话, 那都挤在一个任务文件中显然也是不合理的. 因此我们可以将celery单独定义成一个模块化的功能.

    创建celery_tasks模块

    ihome下创建一个新的python包, 名为celery_tasks, 用来标识celery的任务模块, 在该目录下新建一个main.py文件, 用来创建celery对象和管理任务以及worker的启动

    # ihome/celery_tasks/main.py
    from celery import Celery
    from manager import app as flask_app
    
    # 创建celery对象
    celery = Celery(flask_app.name, broker=flask_app.config['CELERY_BROKER_URL'])
    # 添加celery配置
    celery.conf.update(flask_app.config)
    

    注:

    1. 还是celery的配置项还是关联了flask的app对象, 当然也可以不必关联app对象, 直接另外定义一个celery的配置文件也是可以的

    2. 这里导入flask的app时, 不能使用flask的current_app, 因为前面说过了worker启动和flask启动没有关系, 因此启动worker时读取不到current_app

    3. 使用的是从manager中导入的app对象, 但是注意导入app后, 需要将app重命名一下, 如重命名为flask_app, 因为celery的对象默认的名字也叫app, 这样celery读取的时候会报错:'flask' object has no attribute 'user_options'

      (flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.celery_tasks.main worker -l=info
      Error: 
      Unable to load celery application.
      'flask' object has no attribute 'user_options'
      

      至于之前在manager.py中创建celery对象时也是使用的app并没有报错, 我猜想可能是因为manager.py中的app是新创建的变量, 而不是从某个地方import导入的, 而这里会报错是因为app是从外面导入过来的, 所以报错了吧.

    创建发送短信任务模块

    ihome/celery_tasks目录下再创建一个python包, 命名为send_sms_code, 用来标识发送短信的任务, 再在其目录下创建tasks.py文件, 里面编写具体的任务函数

    from ihome.celery_tasks.main import celery
    from ronglian_sms_sdk import SmsSDK
    from ihome.utils import constants
    from ihome.utils.response_codes import RET
    from flask import current_app
    import json
    
    @celery.task
    def send_sms_code(sms_code, phone):
        """发送短信任务"""
        result = {'errno': RET.OK}
        try:
            sdk = SmsSDK(constants.ACCID, constants.ACCTOKEN, constants.APPID)
            tid = '1'
            mobile = phone
            # 短信模板为:....您的验证码是{1},请于{2}分钟内正确输入
            # data参数为元组,第一个值为模板中的{1},第二个值为模板中的{2}
            data = (sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60)
            # 发送短信,接受返回值
            sms_resp_json = sdk.sendMessage(tid, mobile, data)
        except Exception as e:
            current_app.logger.error(e)
            result['errno'] = RET.THIRDERR
            result['errmsg'] = '发送短信异常'
        else:
            # 处理返回值
            sms_resp_dict = json.loads(sms_resp_json)
            sms_status = sms_resp_dict.get('statusCode')
            if sms_status not in ('000000', '112310'):
                # 发送失败
                result['errno'] = RET.THIRDERR
                result['errmsg'] = sms_resp_dict.get('statusMsg')
        return result
    

    注:

    1. 把之前创建的ihome/tasks.py移过来就好了, 只是导入celery的路径改成从main中导入.

    2. 如果还有其他任务, 比如发送邮件等, 就可以同样在ihome/celery_tasks目录下创建send_mailpython包, 并创建它下面的tasks.py任务文件即可.

    3. 注意把之前调用任务的业务代码verify_codes.py的导入任务语句路径改成: from ihome.celery_tasks.send_sms_code import tasks

    将短信任务添加到main中管理

    回到ihome/celery_tasks/main.py中, 追加一句话, 可以将celery_tasks下定义的具体任务模块添加到main中

    # 自动搜索任务
    celery.autodiscover_tasks(['ihome.celery_tasks.send_sms_code'])
    

    注:

    1. 添加的是一个列表, 也就是说可以把多个任务都添加到列表中
    2. 列表中的名字是项目根目录的绝对路径, 只需要写到具体模块名(send_sms_code)就可以了, 前提是模块名下的任务文件必须名为tasks.py, 否则需要写到具体的任务文件, 如'ihome.celery_tasks.send_sms_code.my_tasks'

    最终celery文件结构

    ihome/celery_tasks/
    ├── __init__.py
    ├── main.py
    └── send_sms_code
        ├── __init__.py
        └── tasks.py
    

    再次启动flask应用和worker测试是否可以正常使用.注意启动脚本到是main.py

    (flask) alex@alex:~/python/FlaskIhome$ celery -A ihome.celery_tasks.main worker -l=info
    
  • 相关阅读:
    Python-asyncio
    Python-异步编程
    软件工程个人作业01
    《构建之法》阅读笔记6
    《构建之法》阅读笔记5
    《构建之法》阅读笔记4
    《构建之法》阅读笔记3
    《构建之法》第二章阅读笔记
    《构建之法》第一章阅读笔记
    开发web信息管理系统用到的相关技术
  • 原文地址:https://www.cnblogs.com/gcxblogs/p/13565993.html
Copyright © 2011-2022 走看看