zoukankan      html  css  js  c++  java
  • 4.celery异步并发处理

    Celery

    1.简介

    基于python开发的

    Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

    文档:http://docs.jinkan.org/docs/celery/getting-started/index.html

    作用:1.异步处理问题 2.定时任务

    Celery的特点是:

    • 简单,易于使用和维护,有丰富的文档。
    • 高效,单个celery进程每分钟可以处理数百万个任务。
    • 灵活,celery中几乎每个部分都可以自定义扩展。
    任务队列是一种跨线程、跨机器工作的一种机制.
    	任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
    celery通过消息进行通信,通常使用一个叫Broker(中间人)来协助client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。
    

    Celery的运作流程

    Celery的架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    
    一个celery系统可以包含很多的worker和broker
    
    Celery本身不提供消息队列功能,但是可以很方便地和第三方提供的任务队列进行集成,包括RabbitMQ ,Redis, MongoDB 等
    

    2.安装

    pip install -U celery==4.3.0
    
    # 提醒:因为celery默认使用了多进程,所以在celery4.0.0版本以后,不支持windows系统的。
    # 在windows下面即便安装了celery4.0.0以前的版本,其实也有兼容问题的。
    

    也可从官方直接下载安装包:https://pypi.python.org/pypi/celery/

    tar xvfz celery-0.0.0.tar.gz
    cd celery-0.0.0
    python setup.py build
    python setup.py install
    

    安装完成了celery以后,接下来要确保服务器下有任务队列,可以是redis,也可以是RabbitMQ。在这里,我们已经安装了redis,可以直接使用redis作为任务队列,也可以使用RabbitMQ【注意,我们提供的ubuntu下是没有RBMQ的,安装文档:https://blog.csdn.net/qq_22638399/article/details/81704372】

    3.使用

    使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。

    1.一般celery任务目录直接放在项目的根目录下创建mycelery包,路径:

    renranapi/
    ├── mycelery/
        ├── config.py     # 配置文件
        ├── __init__.py   
        ├── main.py       # 主程序,导入celery并进行任务注册和加载配置
        └── sms/          # 一个目录可以放置多个任务,该目录下存放当前任务执行时需要的模块或依赖
            └── tasks.py  # 任务的文件,名称必须是这个!!!
    

    2.mycelery目录下创建文件(名字随意),例如: main.py,代码:

    # 主程序
    from celery import Celery
    # 1.创建celery实例对象,可以实例化多个对象,可以指定名称'renran',也可以不指定
    app = Celery("renran")
    
    # 2.通过app对象加载配置
    app.config_from_object("mycelery.config")
    
    # 3.自动搜索并加载任务
    # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称(tasks.py文件的上一层目录路径)
    # app.autodiscover_tasks(["任务1","任务2",....])
    
    app.autodiscover_tasks(["mycelery.mytasks"])
    
    

    celery的配置项文档:https://docs.celeryproject.org/en/stable/userguide/configuration.html

    3.配置文件config.py,代码:

    # 任务队列的链接地址
    broker_url = 'redis://127.0.0.1:6379/15'
    # 结果队列的链接地址
    result_backend = 'redis://127.0.0.1:6379/14'
    

    4.创建一个任务文件mytasks/tasks.py,并创建任务,代码:

    # celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
    from mycelery.main import app  # 引入示例化对象
    
    @app.task  
    # name表示设置任务的名称,如果不填写,路径做任务名mycelery.mytasks.tasks.sms_code1
    def send_sms():
        print("发送短信!!!")
    
    @app.task(name="send_sms2")  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
    def send_sms2():
        print("发送短信任务2!!!")
    

    5.启动运行celery:

    # 启动Celery的命令
    # 强烈建议切换目录到项目的根目录下启动celery!!
    # 也就是你创建celery目录的上一层目录下启动
    celery -A mycelery.main worker --loglevel=info
    

    6.调用执行任务 -- celery目录下随意创建文件:xx.py

    # 调用celery执行异步任务
    from mycelery.mytasks.tasks import sms_code2
    task_obj = send_sms.delay() # 返回任务id --task_id
    

    1.把django和celery进行组合

    发送短信任务添加到celery任务中

    1.在mycelery/mytasks/tasks.py文件中添加发送短信任务

    from mycelery.main import app
    from django.conf import settings
    from ronglian_sms_sdk import SmsSDK
    from renranapi.settings import contains
    
    # 自动把函数加载到队列中
    @app.task(name = 'send_sms')
    def send_sms(mobile,code):
        # 参数获取
        accId = settings.SMS_INFO.get('accId')
        accToken = settings.SMS_INFO.get('accToken')
        appId = settings.SMS_INFO.get('appId')
        tid = settings.SMS_INFO.get('tid').get('register')  # 荣联云短信模板id
        expire_time = contains.SMS_CODE_EXPIRE_TIME  # 短信过期时间
    
        # 发送短信逻辑
        sdk = SmsSDK(accId, accToken, appId)
        sdk.sendMessage(tid, mobile, (code, expire_time // 60))
    
        return '发送成功'
    

    2.users/views.py视图中引入发送短信任务

    # 手机短信
    import random
    from renranapi.settings import contains # 引入常量
    from django_redis import get_redis_connection  # 引入redis库
    import logging # 记录日志
    from .utils import get_user_object # 验证手机号
    from mycelery.mytasks.tasks import send_sms # 引入发送短信任务
    
    # 短信验证码
    class SmsView(APIView):
    
        def get(self, request):
            try:
                mobile = request.query_params.get('mobile')  # 手机号
                # 发送短信需要的参数
                # accId = settings.SMS_INFO.get('accId')
                # accToken = settings.SMS_INFO.get('accToken')
                # appId = settings.SMS_INFO.get('appId')
                # tid = settings.SMS_INFO.get('tid').get('register')  # 荣联云短信模板id
                expire_time = contains.SMS_CODE_EXPIRE_TIME   # 短信过期时间
                interval_time = contains.SMS_CODE_INTERVAL_TIME # 短信间隔时间
    
                conn = get_redis_connection('sms_code') # 拿到储存短信验证码库
    
                # 先验证手机号是否已经被注册
                user = get_user_object(mobile)
                if user:
                    return Response({'error':'该手机号已经被注册,请核实'},status=status.HTTP_400_BAD_REQUEST)
    
                # 短信间隔内,短信是否已经发送过
                inter_code = conn.get(f'sms_interval_{mobile}')
                if inter_code:
                    return Response({'error':'60秒内已经发送过短信了,请稍等!'},status=status.HTTP_400_BAD_REQUEST)
    
                # 生成短信验证码
                code = '%04d' % random.randint(0, 9999)  # 随机4位短信验证码
    
                # 保存短信验证码
                # redis优化 一次链接储存多个数据,尽量减少链接
                pipe = conn.pipeline()
                pipe.multi() # 开启redis的批量操作
    
                pipe.setex(f'sms_{mobile}',expire_time,code) # 存有效期数据
                pipe.setex(f'sms_interval_{mobile}',interval_time,code) # 存间隔时间数据
    
                pipe.execute() #执行批量操作
    
                # 发送短信验证码
                # sdk = SmsSDK(accId, accToken, appId)
                # sdk.sendMessage(tid, mobile, (code, expire_time//60))
                send_sms.delay(mobile,code)
    
                return Response({'msg': 'ok'})
            except Exception as e:
                # 发生错误,记录日志
                logger = logging.getLogger('django')
                logger.error(f'{mobile}--短信发送失败--{str(e)}')
    
                return Response({'error':'短信发送失败'},status=status.HTTP_500_INTERNAL_SERVER_ERROR)
    

    3.在main.py主程序中对django的配置文件进行加载

    from celery import Celery
    import os
    import django
    
    # 把celery和django进行组合,识别和加载django的配置文件
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'renranapi.settings.dev')
    # 对django框架执行初始化
    django.setup()
    
    # 实例化对象
    app = Celery('renran')
    # 加载配置
    app.config_from_object('mycelery.config')
    # 自动搜索并加载任务
    app.autodiscover_tasks(['mycelery.mytasks'])
    

    再次启动项目celery。

    最终在django里面,我们调用Celery来异步执行任务。需要完成2个步骤:

    # 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决
    from mycelery.mytasks.tasks import send_sms
    
    # 2. 调用任务函数,发布任务
    send_sms.delay(mobile,code)
    # send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容
    
  • 相关阅读:
    AcWing 1135. 新年好 图论 枚举
    uva 10196 将军 模拟
    LeetCode 120. 三角形最小路径和 dp
    LeetCode 350. 两个数组的交集 II 哈希
    LeetCode 174. 地下城游戏 dp
    LeetCode 面试题 16.11.. 跳水板 模拟
    LeetCode 112. 路径总和 递归 树的遍历
    AcWing 1129. 热浪 spfa
    Thymeleaf Javascript 取值
    Thymeleaf Javascript 取值
  • 原文地址:https://www.cnblogs.com/jia-shu/p/14589715.html
Copyright © 2011-2022 走看看