zoukankan      html  css  js  c++  java
  • Celery--分布式任务队列:

    任务队列用作一种在线程或计算机之间分配工作的机制。

    Celery用Python编写,但是该协议可以用任何语言实现。

    只要涉及到第三方服务(发短信,发邮件)的时候,建议用异步队列来实现。

    例如有一个四段服务是串行的,第一、二段和第四段是服务器本身的,而第三段是第三方任务,在程序运行时,第三方任务的耗时非常慢,串行的代码实现第三段服务了消耗的时间很长,造成用户体验差。这时候我们就可以使用异步来处理代码。异步就是把一段程序中的某个模块放到另一个进程执行的,在这里我们可以让第三段任务异步处理,让整个程序并行执行(单独把第三段拉出去执行),这样一来服务器本身只执行第一、二、四段。就算第三方任务就算执行时间很长,也不会影响整个代码流程了。

    Celery及异步任务的处理:

    Celery有两个功能,一个是处理异步任务,一个是处理定时任务。

    Celery处理任务的流程有三部分,分别是

    • 消息中间件(Broker) -- 消息中间件相当于一个队列,用来存放任务
    • 任务执行单元(Celery Worker) -- 负责监听消息中间件
    • 结果存储(Backend) -- 存储任务处理结果

    任务模块(Task):包含异步任务和定时任务,其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由Celery Beat 进程周期性地将任务发往任务队列。

    消息中间件(Broker):负责任务队列调度,接收任务生产者发来的任务,将任务存入队列,Celery本身不提供队列服务,官方推荐使用Redis和RabbitMQ等。

    任务执行单元(Worker): Worker是执行任务的处理单元,它实施监控消息队列,获取队列中调度的任务,并执行它。

    任务结果存储(Backend): Backend用于存储任务的执行结果,以供查询,同消息中间件一样,存储也可以使用Redis、RabbitMQ和MongoDB等。

    Django中使用Celery的工作流程:

    1. Django把耗时任务封装成一个方法
    2. 然后把耗时任务丢到队列(消息中间件)里面,
    3. Celery Worker监听到队列中有任务,就创建Worker,有多少个耗时任务就创建多少个Worker(可以是进程或者线程)
    4. 工人操作完任务以后就把结果放到存储中。

    Celery在Django中使用官方文档连接:https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#using-celery-with-django

    配置celery:

    • Django == 1.11.11
    • celery == 4.4.0

    新建一个worker文件,创建__init__文件

    __init__.py > 除了目录名,其他都是固定写法

    import os
    
    from celery import Celery
    
    from worker import config
    
    # 加载django的环境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "主目录名.settings")
    
    # 实例化celery
    celery_app = Celery('swiper(主目录名)')
    
    # 加载配置文件
    celery_app.config_from_object(config)
    
    # 自动注册任务
    celery_app.autodiscover_tasks()
    

    worker > 新建config.py , 用来指定消息中间件使用什么队列,这里使用Redis。

    broker_url = 'redis://127.0.0.1:6379/0'
    

    使用celery:封装短信验证码到func.py文件中

    在公共目录common下,新建func.py文件 >

    import random
    
    from libs.yuntongxun.sms import CCP
    from worker import celery_app
    from django_redis import get_redis_connection
    
    
    # 把django中第三方或者耗时的方法变成celery中的任务
    @celery_app.task
    def send_sms(phone):
        ccp = CCP()
    
        sms_code = create_sms_code(4)
    
        # 把验证码存储redis的缓存中
        redis_cli = get_redis_connection()
        redis_cli.set(f'smscode-{phone}', sms_code, 18000)
    
        ccp.send_template_sms(phone, [sms_code, 3], 1)
    
    def create_sms_code(num):
    
        start = 10 ** (num - 1)
        end = 10 ** num - 1
    
        return random.randint(start, end)
    

    把celery任务放到队列中

    # 把执行发短信的任务放入队列
    # delay接收的参数就是被celery封装的参数,
    # data.get("phone")获取用户输入的手机号
    send_sms.delay(data.get("phone"))
    

    启动celery消息队列:(Windows对多进程不是很友好,所以在Windows中使用单进程)

    参数:

    • -A 指定文件名 -l 自制输出格式 --pool=solo(单继承执行,Linux系统不需要)
    celery worker -A worker -l info --pool=solo
    

    异步任务已开启!!!worker开始监听

  • 相关阅读:
    消息队列接口API(posix 接口和 system v接口)
    Ubuntu 安装 Eclipse C/C++开发环境
    Ubuntu下Eclipse搭建ARM开发环境
    Linux进程间通信——使用流套接字
    Linux进程间通信——使用数据报套接字
    Linux进程间通信——信号集函数
    Linux进程间通信——使用信号
    Linux进程间通信——使用匿名管道
    mappedBy的作用
    VS Code 配置 C/C++ 环境
  • 原文地址:https://www.cnblogs.com/lance-lzj/p/13961682.html
Copyright © 2011-2022 走看看