zoukankan      html  css  js  c++  java
  • Celery多队列配置

    Celery多队列配置

    Celery官方文档

    项目结构

    /proj
    -__init__
    -app.py                        #实例化celery对象
    -celeryconfig.py               #celery的配置文件
    -tasks.py                      #celery编写任务文件

    app.py

    #coding:utf-8
    from __future__ import absolute_import
    from celery import Celery
    
    app = Celery('proj', include=['proj.tasks'])     #实例化celery对象
    
    app.config_from_object('proj.celeryconfig')      #引入配置文件
    
    if __name__ == '__main__':                       
        app.start()
    • proj参数为celery的名字
    • include参数为启动时导入的模块列表

     tasks.py

    #coding:utf-8
    from __future__ import absolute_import
    
    from proj.app import app
    @app.task()
    def add(x, y):
        return x + y

    celeryconfig.py

    #coding:utf-8
    from kombu import Queue
    
    BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672//' # 使用RabbitMQ作为消息代理
    
    
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 把任务结果存在了Redis
    
    CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案
    
    CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
    
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
    
    CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
    
    CELERY_QUEUES = (    #设置add队列,绑定routing_key
        Queue('add', routing_key='xue.add'),
    )
    
    
    CELERY_ROUTES = {   #projq.tasks.add这个任务进去add队列并routeing_key为xue.add  
        'projq.tasks.add': { 
            'queue': 'add',
            'routing_key': 'xue.add',
        }
    }
    • CELERY_ACCEPT_CONTENT的类型msgpack为是一种比json更小更快的类型,如果用需要安装相对应的包。
    • CELERY_QUEUES设置一个指定routing_key的队列,这个名字可以任意指定。
    • CELERY_ROUTES设置路由,对指定的任务名,指定对应的队列和routing_key,注意,这里的routing_key需要和上面参数的一致。

    启动

    在proj的上层目录输入

    celery -A proj.app worker -Q add -l info
    • proj.tasks.add为任务名称,也就是在CELERY_ROUTES设置的那个名称
    • add是设置的queue,key=xue.add是设置的routing_key

     

    发布任务

    from proj.tasks import add
    
    add.delay(2,3)

    多队列中需要修改的地方

    CELERY_QUEUES = (    #设置add队列,绑定routing_key
        Queue('add', routing_key='xue.add'),
    )
    
    
    CELERY_ROUTES = {   #projq.tasks.add这个任务进去add队列并routeing_key为xue.add  
        'projq.tasks.add': { 
            'queue': 'add',
            'routing_key': 'xue.add',
        }

     配置两个队列

    # 配置队列
    CELERY_QUEUES = (
        Queue('default', routing_key='default'),
        Queue('队列1', routing_key='key1'),
        Queue('队列2',  routing_key='key2'),
    )
     
    # 路由(哪个任务放入哪个队列)
    CELERY_ROUTES = {
        '任务1': {'queue': '队列1', 'routing_key': 'key1'},
        '任务2': {'queue': '对列2', 'routing_key': 'key2'},
    }
    关于Celery的监控与管理向导




  • 相关阅读:
    HDU4529 郑厂长系列故事——N骑士问题 —— 状压DP
    POJ1185 炮兵阵地 —— 状压DP
    BZOJ1415 聪聪和可可 —— 期望 记忆化搜索
    TopCoder SRM420 Div1 RedIsGood —— 期望
    LightOJ
    LightOJ
    后缀数组小结
    URAL
    POJ3581 Sequence —— 后缀数组
    hdu 5269 ZYB loves Xor I
  • 原文地址:https://www.cnblogs.com/-wenli/p/10976614.html
Copyright © 2011-2022 走看看