zoukankan      html  css  js  c++  java
  • 五、Celery 高级用法【20200912】

    0、获取原生链接后,可以参考官方文档

    https://docs.celeryproject.org/projects/kombu/en/stable/index.html
    Celery,操作MQ,有分为:低层次:py-amqp,高层次:kombu

    1、通过Celery,获取原生的RabbitMQ链接进行操作

    案例:生产者与消费者

    # 生产者
    from django_celery_project import celery_app
    conn = celery_app.broker_connection()
    with conn.channel() as channel:
        producer = Producer(channel)
        from kombu import Exchange, Queue
        task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
    
        producer.publish(
            {'hello': 'world'},
            retry=True,
            exchange=task_queue.exchange,
            routing_key=task_queue.routing_key,
            declare=[task_queue],  # declares exchange, queue and binds.
        )
    
    # 消费者
    def callback(body, message):
        print(body)
        message.ack()
    
    from django_celery_project import celery_app
    conn = celery_app.broker_connection()
    task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
    with conn.channel() as channel:
        consumer = conn.Consumer(queues=task_queue, channel=channel)
        consumer.register_callback(callback)
        with consumer:
            conn.drain_events(timeout=1)

    2、通过Celery获取链接,实现获取队列大小

    获取队列大小的作用,就是在删除队列之前一定在判断队列还有没有数据,有数据的话,不能被删除
        from django_celery_project import celery_app
        broker_connection = celery_app.broker_connection()
        # tasks 是队列名字
        print(broker_connection.channel().basic_get('tasks', no_ack=False).delivery_info)
        print(broker_connection.channel().basic_get('tasks', no_ack=False).headers)

     运行结果

    {'delivery_tag': 1, 'redelivered': True, 'exchange': 'tasks', 'routing_key': 

    'tasks', 'message_count': 5}<== message_count 就是队列的大小,记得再加上1,才是总的数量
    {'content_type': 'application/json', 'content_encoding': 'utf-8', 
    'application_headers': {}, 'delivery_mode': 2, 'priority': 0}

    3、通过Celery获取链接,删除队列和交换接口

    from django_celery_project import celery_app
        broker_connection = celery_app.broker_connection()
        broker_connection.channel().exchange_delete('tasks') # 填写删除的交换接口
        broker_connection.channel().queue_delete('tasks') # 填写删除的队列名字
    # 其它定义交换接口,队列,绑定关系,都在 broker_connection.channel() 进行调用
  • 相关阅读:
    BZOJ1954 Pku3764 The xor-longest Path
    BZOJ3697 采药人的路径
    BZOJ1468 Tree
    BZOJ2326 [HNOI2011]数学作业
    BZOJ2809 [Apio2012]dispatching
    BZOJ1334 [Baltic2008]Elect
    BZOJ2882 工艺
    BZOJ3791 作业
    BZOJ1224 [HNOI2002]彩票
    [noip2013]花匠
  • 原文地址:https://www.cnblogs.com/ygbh/p/13658888.html
Copyright © 2011-2022 走看看