zoukankan      html  css  js  c++  java
  • celery+RabbitMQ 实战记录2—工程化使用

    上篇文章中,已经介绍了celery和RabbitMQ的安装以及基本用法。

    本文将从工程的角度介绍如何使用celery。

    1.配置和启动RabbitMQ

    请参考celery+RabbitMQ实战记录

    2. 安装和使用celery

    2.1 创建虚拟环境,并安装celery

    $ mkdir celery_demo
    $ cd celery_demo
    $ virtualenv -p python3 venv3
    
    $ ./venv3/bin/pip install celery
    

    项目的目录结构说明:

    -- celery_demo
        -- api.py
        -- celeryconfig.py
        -- rocket
            -- celery.py
            -- tasks.py
    

    2.2 配置celery

    创建配置文件 celeryconfig.py,里面包含CELERY_IMPORTSBROKER_URLCELERYD_LOG_FORMATCELERY_ROUTES.

    # celeryconfig.py
    
    RABBIT_MQ = {
        'HOST': '127.0.0.1',
        'PORT': 5672,
        'USER': 'test',
        'PASSWORD': '123456'
    }
    
    CELERY_IMPORTS = ("rocket.tasks", )
    
    BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT'])
    
    CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s'
    
    CELERY_ROUTES = {
            'rocket.tasks.add': {'queue': 'sunday'},
    }
    
    

    其中,参数定义如下:

    • CELERY_IMPORTS 导入的task

    • BROKER_URL指定了broker信息,即消息队列的地址。

    • CELERYD_LOG_FORMAT 指定了日志格式。

    • CELERY_ROUTES 指定了路由信息,即调用rocket.tasks.add后,消息具体放入哪个队列,这里是队列名称为sunday

    2.3 创建celery实例

    创建目录

    mkdir rocket
    

    在rocket目录下,创建文件celery.py

    from celery import Celery
    
    app = Celery("orange", backend='amqp')
    app.config_from_object("celeryconfig")
    
    

    创建实例,并读取配置。

    2.4 定义tasks

    在rocket目录下,创建文件tasks.py

    from rocket.celery import app
    
    
    @app.task
    def add(x, y):
            return x + y
    

    定义tasks。

    2.5 启动celery(即消费者)

    确认当前所在的目录:

    $ pwd
    /workspace/celery_demo
    

    启动消费者

    $ ./venv3/bin/celery worker  -A rocket -Q sunday --loglevel=info  -f app.log
    
    /workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
        The AMQP result backend is scheduled for deprecation in     version 4.0 and removal in version v5.0.     Please use RPC backend or a persistent backend.
    
      alternative='Please use RPC backend or a persistent backend.')
    
    celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb)
    
    Darwin-18.2.0-x86_64-i386-64bit 2019-04-22 20:53:56
    
    [config]
    .> app:         orange:0x10abdb6d8
    .> transport:   amqp://test:**@127.0.0.1:5672/myvhost
    .> results:     amqp://
    .> concurrency: 8 (prefork)
    .> task events: OFF (enable -E to monitor tasks in this worker)
    
    [queues]
    .> sunday           exchange=sunday(direct) key=sunday
    
    
    [tasks]
      . rocket.tasks.add
    

    2.6 生产者

    创建文件api.py,会调用apply_async向队列中投放消息。

    # api.py
    from rocket.tasks import add
    
    
    print("start...")
    
    result = add.apply_async((1, 2), expires=10)
    
    print("result:", result)
    print(result.ready())
    
    
    print("end...")
    
    

    执行api.py,启动生产者

    ./venv3/bin/python api.py
    start...
    result: 6ef453e6-1797-4dd7-a2fc-bd6ef8096fde
    True
    end...
    

    2.6 查看结果

    查看日志文件app.log

    [2019-04-23 09:53:46,406] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost
    [2019-04-23 09:53:46,420] [INFO] mingle: searching for neighbors
    [2019-04-23 09:53:47,455] [INFO] mingle: all alone
    [2019-04-23 09:53:47,474] [INFO] celery@admindeMacBook-Pro-2.local ready.
    [2019-04-23 09:53:55,850] [INFO] Received task: rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde]   expires:[2019-04-23 01:54:05.817853+00:00]
    [2019-04-23 09:53:55,876] [INFO] Task rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] succeeded in 0.023095715092495084s: 3
    

    参考

    using celery in project

  • 相关阅读:
    第四次实验报告
    第三次实验报告
    循环结构课后反思
    第二次实验报告
    第一次实验报告1
    第一次作业
    第二次实验报告 总结
    第九章实验报告
    第八章实验报告
    第六次实验报告
  • 原文地址:https://www.cnblogs.com/lanyangsh/p/10754685.html
Copyright © 2011-2022 走看看