zoukankan      html  css  js  c++  java
  • 五、Celery 高级用法【20200912】

    0、获取原生链接后,可以参考官方文档

    https://docs.celeryproject.org/projects/kombu/en/stable/index.html
    Celery,操作MQ,有分为:低层次:py-amqp,高层次:kombu

    1、通过Celery,获取原生的RabbitMQ链接进行操作

    案例:生产者与消费者

    # 生产者
    from django_celery_project import celery_app
    conn = celery_app.broker_connection()
    with conn.channel() as channel:
        producer = Producer(channel)
        from kombu import Exchange, Queue
        task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
    
        producer.publish(
            {'hello': 'world'},
            retry=True,
            exchange=task_queue.exchange,
            routing_key=task_queue.routing_key,
            declare=[task_queue],  # declares exchange, queue and binds.
        )
    
    # 消费者
    def callback(body, message):
        print(body)
        message.ack()
    
    from django_celery_project import celery_app
    conn = celery_app.broker_connection()
    task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
    with conn.channel() as channel:
        consumer = conn.Consumer(queues=task_queue, channel=channel)
        consumer.register_callback(callback)
        with consumer:
            conn.drain_events(timeout=1)

    2、通过Celery获取链接,实现获取队列大小

    获取队列大小的作用,就是在删除队列之前一定在判断队列还有没有数据,有数据的话,不能被删除
        from django_celery_project import celery_app
        broker_connection = celery_app.broker_connection()
        # tasks 是队列名字
        print(broker_connection.channel().basic_get('tasks', no_ack=False).delivery_info)
        print(broker_connection.channel().basic_get('tasks', no_ack=False).headers)

     运行结果

    {'delivery_tag': 1, 'redelivered': True, 'exchange': 'tasks', 'routing_key': 

    'tasks', 'message_count': 5}<== message_count 就是队列的大小,记得再加上1,才是总的数量
    {'content_type': 'application/json', 'content_encoding': 'utf-8', 
    'application_headers': {}, 'delivery_mode': 2, 'priority': 0}

    3、通过Celery获取链接,删除队列和交换接口

    from django_celery_project import celery_app
        broker_connection = celery_app.broker_connection()
        broker_connection.channel().exchange_delete('tasks') # 填写删除的交换接口
        broker_connection.channel().queue_delete('tasks') # 填写删除的队列名字
    # 其它定义交换接口,队列,绑定关系,都在 broker_connection.channel() 进行调用
  • 相关阅读:
    jquery中ajax请求的使用和四个步骤示例
    jzoj6094
    2019.03.27【GDOI2019】模拟 T3
    AGC019F
    浅谈高维前缀和
    刷题清单
    为什么要遍历两次?——个人对于kosaraju算法的理解
    我的黑客和渗透测试学习路线
    一个假猪套神器:NET CAT-NC
    kali linux(二):使用与介绍
  • 原文地址:https://www.cnblogs.com/ygbh/p/13658888.html
Copyright © 2011-2022 走看看