zoukankan      html  css  js  c++  java
  • Celery初识及简单实例

      Celery是一个“自带电池”的任务队列。易于使用,可以轻易入门,它遵照最佳实践设计,使产品可以扩展,或与其他语言集成,并且它自带了在生产环境中运行这样一个系统所需的工具和支持。本文介绍基础部分:

    • 选择和安装消息传输方式(中间人)。
    • 安装Celery并创建一个任务
    • 运行职程并调用任务
    • 追踪任务在不同状态间的迁移,并检视返回值

    一、选择中间人

    Celery需要一个发送和接收消息的解决方案,其通常以独立服务形式出现,称为消息中间人。

    可行的选择包括:

    RabbitMQ

    RabbitMQ功能完备、稳定、耐用,并且安装简便,是生产环境的绝佳选择。

    Redis

    Redis也是功能完备的,但更容易受濡染终端或断电地阿莱数据丢失的影响。

    使用数据库

    不推荐把数据库用于消息队列,但对于很小的项目可能是合适的。

    其他中间人

    还有其他实验性传输实现:AmazonSQS、MongoDB和IronMQ。

    二、安装Celery

    Celery提交到Python Package Index上,可以使用Python标准工具pip进行安装:

    $ pip install celery
    

    三、应用

    首先需要一个Celery实例:这个实例用于你想在celery中做一切事,它必须可以被其他模块导入。

    我们简单的放在一个模块中,对于比较大的项目,可以创建一个独立模块。

    创建tasks.py:

    from celery import Celery

    app = Celery('tasks', broker='redis://127.0.0.1:6379/3')


    @app.task
    def add(x, y):
    return x + y

    创建APP的时候,Celery方法的第一个参数是我们当前py文件的名字。broker是我们选择的中间人。上面的代码使用的是redis。

    接下来我们可以先运行我们的工作任务。

    $ celery -A tasks worker --loglevel=info

    使用celery命令,第二个参数'tasks'还是我们的py文件名字,后面是指定日志级别。这条命令必须能找到我们的tasks文件才能运行起来。

    接下来调用我们的任务:

    >>> from tasks import add
    >>> add.delay(1, 3)
    <AsyncResult: 7217dee2-3869-4f5e-8ccc-3a3b3dd6a9d7>
     
    # 这里返回的是任务ID,而不是结果。

    这样我们就实现了一个最简单的celery应用。

    其他更多的参数命令,你可以通过下面的方式了解:

    $ celery worker --help
    
    $ celery help
    

    关于调用我们的任务方式:

    • delay()
    • apply_async()
    # delay()这个方法使用起来更方便一些,它能像调用普通函数一样调用我们的任务
    task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
    
    # 使用apply_async你还需要将参数进行整合
    task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
    

      

    接下来说一下,在我们的应用中如何使用celery

    项目布局:

    proj/__init__.py
        /celery.py
        /tasks.py
    

    项目中的celery.py

    celery.py

    from __future__ import absolute_import
    from celery import Celery
    
    
    app = Celery('mysite',  # 项目名称
                 broker='redis://127.0.0.1:6379/3',  # 制定我们用的中间人
                 backend='redis://127.0.0.1:6379/5',  # 定义回调中间人
                 include=['proj.tasks'],) 
    
    app.conf.update(
        result_expires=3600  # 设置过期时间
    )
    
    if __name__ == '__main__':
        app.start()
    

      

    task.py

    from __future__ import absolute_import
    from .celery import app
    import time
    
    
    @app.task(bind=True)  # 指定bind=True,当前task函数第一个参数为task本身。
    def add(self, x, y):
        time.sleep(1)
        self.update_state(state="PROGRESS", meta={'progress': 50})
        # task使用update_state修改当前状态可以触发on_message回调函数
        time.sleep(3)
        self.update_state(state="PROGRESS", meta={'progress': 90})
        time.sleep(1)
        print(self.request.chain)
        print(123456)
        self.request.chain = None  # 当使用chain同时调用多个task函数的时候,如果中间需要停止,可以使用self.request.chain = None
        self.update_state(state="PROGRESS", meta={'result': x + y})
        return x + y
    
    
    @app.task(bind=True)
    def mul(self, x, y):
        return x * y
    
    
    @app.task
    def div(x, y):
        return x / y
    

      

    mysite/views.py

    from celery import chain
    
    from django.shortcuts import render, HttpResponse
    
    from celery_work.tasks import add
    from celery_work.tasks import mul
    from celery_work.tasks import div
    from celery_work.tasks import error_handler
    # from celery_work.tasks import error_handler
    
    
    def test(request):
        def on_raw_message(body):
            print(body)
        res = add.apply_async((1, 2))
    
        print(res.get(on_message=on_raw_message, propagate=False))
        # 注意这里使用了res.get(),所以并不是异步返回的,而是等拿到结果之后返回的
        return HttpResponse('ok')
    
    
    # 异步方式处理函数,在add这个task函数中是有睡眠时间的,但是我们在发布任务之后就立即将任务ID返回给客户端了
    def test0(request):
        res = add.apply_async((1, 2))
        return HttpResponse('async: %s' % res)
    
    
    # 没有中断的chain任务
    def test1(request):
        res = (mul.s(1, 2) | mul.s(3) | mul.s(3))()
        # 另一种写法  chain(mul.s(1, 2), mul.s(3), mul.s(3))()
        return HttpResponse(res.get())
    
    
    # 有中断的chain任务,这里需要注意一下,我们在add这个函数中将chain终止了,res永远无法get到返回值
    # 因为res是要取chain任务最后一个task函数的返回值,这样就要使用parent从最后一个task函数往前找,
    # 直到找到终止chain任务的那个task函数,进行get
    def test2(request):
        res = (add.s(1, 2) | mul.s(3) | mul.s(3))()
        print(res.parent.parent.get())
        print(res.parent.parent.successful())
        return HttpResponse('mul')
    

      

      

  • 相关阅读:
    [LintCode] 最长上升子序列
    [LintCode] 最长公共前缀
    [LintCode] A + B 问题
    [hihoCoder] 拓扑排序·一
    [LintCode] 拓扑排序
    [LintCode] 第k大元素
    [LintCode] 最小路径和
    [LeetCode] Factorial Trailing Zeros
    [LintCode] 尾部的零
    [LeetCode] Length of Last Word
  • 原文地址:https://www.cnblogs.com/sxzwj/p/6568960.html
Copyright © 2011-2022 走看看