zoukankan      html  css  js  c++  java
  • python测试开发django158.celery 学习与使用 上海

    前言

    Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。
    可以使用的场景如:

    • 异步发邮件,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
    • 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
    • 定时调度任务等

    Celery 简介

    Celery 扮演生产者和消费者的角色,先了解一下什么是生产者消费者模式。
    该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:

    接下来需要弄清楚几个问题,谁生产数据(Task),谁是中间件(Broker),谁来消费数据(Worker),消费完之后运行结果(backend)在哪?

    看下图就很清楚了

    celery 的5个角色

    • Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
    • Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
    • Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
    • Beat 定时任务调度器,根据配置定时将任务发送给Broker。
    • Backend 用于存储任务的执行结果。

    使用环境

    Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。那么需要先安装Redis之类的中间件

    docker pull redis:latest
    docker run -itd --name redis-test -p 6379:6379 redis
    

    上面是没有设置密码的,设置密码用下面这句

    docker run -itd --name myredis   -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes 
    

    pip 安装相关依赖包

    pip install celery==3.1.26.post2
    pip install redis==2.10.6
    

    Task 任务

    先写个最简单的demo,新建一个tasks.py文件,task任务需使用@shared_task装饰器

    from celery import Celery
    from celery import shared_task
    
    # 实例化,添加broker地址
    app = Celery('tasks', broker='redis://ip:6379')
    
    
    @shared_task
    def add(x, y):
        return x + y
    

    打开 tasks.py 所在的目录,启动 worker,-A 参数表示的是 Celery APP 的名,这里指的是 tasks.py。
    worker 是一个执行任务角色,后面的 loglevel=info 记录日志类型默认是 info。

    celery -A tasks worker --loglevel=info
    

    运行结果

    D:\demo\demo\aaa>celery -A tasks worker --loglevel=info
    [2021-10-19 09:12:01,168: WARNING/MainProcess] e:\python36\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
    Starting from version 3.2 Celery will refuse to accept pickle by default.
    
    The pickle serializer is a security concern as it may give attackers
    the ability to execute any command.  It's important to secure
    your broker from unauthorized access when using pickle, so we think
    that enabling pickle should require a deliberate action and not be
    the default choice.
    
    If you depend on pickle then you should set a setting to disable this
    warning and to be sure that everything will continue working
    when you upgrade to Celery 3.2::
    
        CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
    
    You must only enable the serializers that you will actually use.
    
    
      warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
    
     -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
    ---- **** -----
    --- * ***  * -- Windows-10-10.0.17134-SP0
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         tasks:0x2148e024c50
    - ** ---------- .> transport:   redis://localhost:6379//
    - ** ---------- .> results:     disabled://
    - *** --- * --- .> concurrency: 4 (prefork)
    -- ******* ----
    --- ***** ----- [queues]
     -------------- .> celery           exchange=celery(direct) key=celery
    
    
    [tasks]
      . tasks.add
    
    [2021-10-19 09:12:01,276: INFO/MainProcess] Connected to redis://localhost:6379//
    [2021-10-19 09:12:01,365: INFO/MainProcess] mingle: searching for neighbors
    [2021-10-19 09:12:02,761: INFO/MainProcess] mingle: all alone
    [2021-10-19 09:12:03,313: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.
    

    从运行日志看到有tasks任务

    [tasks]
      . tasks.add
    

    看到Connected to redis说明已经连接成功了

    触发任务(delay)

    任务已经创建了,那么什么时候去触发这个任务呢,我们需要在代码里面去触发这个任务,接着上面代码继续写

    @shared_task
    def add(x, y):
        return x + y
    
    if __name__ == '__main__':
        # 触发任务
        res = add.delay(10, 15)
        print(res)
        print(type(res))  # AsyncResult
    

    运行结果

    7492f49b-6735-46fb-a16d-9ec24bd31e56
    <class 'celery.result.AsyncResult'>
    

    通过add任务,调用 .delay() 方法来触发一次任务,返回 AsyncResult 类,那么执行的任务结果都在 AsyncResult 类里

    运行 的时候查看 worker 运行日志,可以看到已经接收到任务Received task,每个任务会生成一个uuid的task_id,不会重复

    [2021-10-19 09:24:14,356: INFO/MainProcess] Received task: tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98]
    [2021-10-19 09:24:14,395: INFO/MainProcess] Task tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] succeeded in 0.046999999998661224s: 25
    

    workder 会自动监听到推过来的任务,然后执行,可以看到执行结果'succeeded '

    backend 任务结果

    调用 .delay() 方法触发任务后,返回 AsyncResult 类,可以查看任务的状态,任务id和任务结果

    D:\demo\demo\aaa>python
    Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    
    >>> from tasks import add
    >>> res = add.delay(10, 15)
    >>> res.task_id
    '6a7c8e10-7192-4865-9108-3e98596b9d37'
    >>>
    >>> res.status
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "E:\python36\lib\site-packages\celery\result.py", line 394, in state
        return self._get_task_meta()['status']
      File "E:\python36\lib\site-packages\celery\result.py", line 339, in _get_task_meta
        return self._maybe_set_cache(self.backend.get_task_meta(self.id))
      File "E:\python36\lib\site-packages\celery\backends\base.py", line 307, in get_task_meta
        meta = self._get_task_meta_for(task_id)
    AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
    >>>
    

    任务执行后会生成一个task_id,查看任务运行状态,会发现出现异常AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
    这是因为任务运行的结果,需要保存到一个地方backend,但是前面实例化的时候只配置一个broker地址,并没有配置backend地址来接收运行结果

    from celery import Celery
    from celery import shared_task
    # backend接收执行结果
    app = Celery('tasks',
                 broker='redis://ip:6379',
                 backend='redis://ip:6379')
    
    
    @shared_task
    def add(x, y):
        return x + y
    

    重新配置后一定要重启worker监听

    celery -A tasks worker --loglevel=info
    


    在启动日志[config]里面会看到results这一项已经配置成功

    D:\demo\demo\aaa>python
    Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from tasks import add
    >>> res = add.delay(10,15)
    >>> res.task_id
    '5ce249c9-a15b-426a-949b-d1b94bf9f8fa'
    >>>
    >>> res.status
    'SUCCESS'
    >>>
    >>> res.get()
    25
    

    常用的几个属性

    • res.task_id 任务id唯一的,可以根据id拿到结果
    • res.status 任务状态:PENDING、STARTED、RETRY、FAILURE、SUCCESS
    • res.get() 任务运行结果,必须要任务状态是'SUCCESS',才会有运行结果
    • r.successful() 返回布尔值,执行成功返回True
    • r.ready() # 返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
    • r.wait() # 等待任务完成, 返回任务执行结果.
    • r.result # 任务执行结果.
    • r.state # 和res.status一样,任务状态:PENDING, START, SUCCESS

    AsyncResult 获取结果

    当触发一个任务后,会得到一个task_id,但是我们不会一直去查询status状态去获取结果,可能会过一段时间再去看看运行结果。
    那么在已经知道task_id 的情况下,如何去查询状态和结果?可以用到AsyncResult 类

    from celery.result import AsyncResult
    res = AsyncResult(id='5ce249c9-a15b-426a-949b-d1b94bf9f8fa')
    print(res.state)   # 'SUCCESS'
    print(res.get())  # 25
    

    结合django使用,参考前面这篇https://www.cnblogs.com/yoyoketang/p/15422804.html
    更多参考教程https://blog.csdn.net/u010339879/article/details/97691231
    更多参考教程https://www.pianshen.com/article/2176289575/

  • 相关阅读:
    ueditor 编译出错
    C# HttpWebRequest向远程地址Post文件
    C# HttpWebRequest请求远程地址获取返回消息
    windows server 2008 R2 Enterprise 防火墙开启允许远程桌面登录
    .Net C# 泛型序列化和反序列化JavaScriptSerializer
    两种方法比较两个字符串的不同
    JSP九大内置对象及四个作用域
    聚沙成塔
    maven环境变量配置
    C3P0和DBCP的区别
  • 原文地址:https://www.cnblogs.com/yoyoketang/p/15423517.html
Copyright © 2011-2022 走看看