zoukankan      html  css  js  c++  java
  • kombu操作RabbitMQ带优先级例子

    Kombu是一个使用AMQP协议的Python库

    安装Kombu

    pip install kombu


    Producer
    import time
    from kombu.entity import Exchange, Queue
    from kombu.messaging import Producer
    from kombu.connection import Connection
    
    with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
        with connection.channel() as channel:
            for i in range(1, 10):
                science_news = Queue(name='kombu_queue',
                                     exchange=Exchange('kombu_queue', type='direct'),
                                     routing_key='kombu_queue',
                                     channel=channel,
                                     durable=False,
                                     )
                science_news.declare()
                producer = Producer(channel, serializer='json', routing_key='kombu_queue')
                producer.publish({'name': 'kombu_queue', 'size': i})
    
                science_news = Queue(name='kombu_queue_1',
                                     exchange=Exchange('kombu_queue_1', type='direct'),
                                     routing_key='kombu_queue_1',
                                     channel=channel,
                                     max_priority=10,  # 优先级
                                     durable=False,
                                     )
                science_news.declare()
                producer = Producer(channel, serializer='json', routing_key='kombu_queue_1')
                producer.publish({'name': 'kombu_queue_1', 'size': i}, priority=i)
    consume
    from kombu.entity import Exchange, Queue
    from kombu.messaging import Consumer
    from kombu.connection import Connection
    
    
    def test_1(body, message):
        print(body)
        print("my_consume_1")
        message.ack()
    
    
    def test_2(body, message):
        print(message)
        print("my_consume_2")
    
    
    def test_3(body, message):
        print("my_consume_3")
    
    
    with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
        with connection.channel() as channel:
            kombu_queue = Queue(name='kombu_queue',
                                exchange=Exchange('kombu_queue', type='direct'),
                                routing_key='kombu_queue',
                                durable=False,
                                channel=channel,
                                )
            kombu_queue_1 = Queue(name='kombu_queue_1',
                                  exchange=Exchange('kombu_queue_1', type='direct'),
                                  routing_key='kombu_queue_1',
                                  durable=False,
                                  channel=channel,
                                  max_priority=10,  # 优先级
                                  )
            # 消费
            consumer = Consumer(channel,
                                queues=[kombu_queue, kombu_queue_1],  # 多个队列
                                accept=['json', 'pickle', 'msgpack', 'yaml'],  # 多种类型
                                callbacks=[test_1, test_2, test_3]  # 多个回调
                                )
            consumer.consume()
    
            while True:
                import time
    
                connection.drain_events()
  • 相关阅读:
    迭代器模式-Iterator
    kubernetes之一 k8s基本概念
    多线程中的异常处理
    MySQL高可用集群方案
    RabbitMQ的集群模式
    多线程中如何取消任务
    公钥、私钥、摘要、数字签名、证书
    将Ubuntu系统迁移到SSD固态操作
    Java线程的状态和状态转换
    Ubuntu18.04系统和软件安装记录
  • 原文地址:https://www.cnblogs.com/clbao/p/12843971.html
Copyright © 2011-2022 走看看