zoukankan      html  css  js  c++  java
  • kombu操作RabbitMQ带优先级例子

    Kombu是一个使用AMQP协议的Python库

    安装Kombu

    pip install kombu


    Producer
    import time
    from kombu.entity import Exchange, Queue
    from kombu.messaging import Producer
    from kombu.connection import Connection
    
    with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
        with connection.channel() as channel:
            for i in range(1, 10):
                science_news = Queue(name='kombu_queue',
                                     exchange=Exchange('kombu_queue', type='direct'),
                                     routing_key='kombu_queue',
                                     channel=channel,
                                     durable=False,
                                     )
                science_news.declare()
                producer = Producer(channel, serializer='json', routing_key='kombu_queue')
                producer.publish({'name': 'kombu_queue', 'size': i})
    
                science_news = Queue(name='kombu_queue_1',
                                     exchange=Exchange('kombu_queue_1', type='direct'),
                                     routing_key='kombu_queue_1',
                                     channel=channel,
                                     max_priority=10,  # 优先级
                                     durable=False,
                                     )
                science_news.declare()
                producer = Producer(channel, serializer='json', routing_key='kombu_queue_1')
                producer.publish({'name': 'kombu_queue_1', 'size': i}, priority=i)
    consume
    from kombu.entity import Exchange, Queue
    from kombu.messaging import Consumer
    from kombu.connection import Connection
    
    
    def test_1(body, message):
        print(body)
        print("my_consume_1")
        message.ack()
    
    
    def test_2(body, message):
        print(message)
        print("my_consume_2")
    
    
    def test_3(body, message):
        print("my_consume_3")
    
    
    with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
        with connection.channel() as channel:
            kombu_queue = Queue(name='kombu_queue',
                                exchange=Exchange('kombu_queue', type='direct'),
                                routing_key='kombu_queue',
                                durable=False,
                                channel=channel,
                                )
            kombu_queue_1 = Queue(name='kombu_queue_1',
                                  exchange=Exchange('kombu_queue_1', type='direct'),
                                  routing_key='kombu_queue_1',
                                  durable=False,
                                  channel=channel,
                                  max_priority=10,  # 优先级
                                  )
            # 消费
            consumer = Consumer(channel,
                                queues=[kombu_queue, kombu_queue_1],  # 多个队列
                                accept=['json', 'pickle', 'msgpack', 'yaml'],  # 多种类型
                                callbacks=[test_1, test_2, test_3]  # 多个回调
                                )
            consumer.consume()
    
            while True:
                import time
    
                connection.drain_events()
  • 相关阅读:
    log4j2分析总结(一)
    Idea(三)常用插件以及快捷键总结
    设计模式(七)_模板方法模式
    Idea(二) 解决IDEA卡顿问题及相关基本配置
    设计模式(六)_观察者模式
    SpringBoot(十一)_springboot热部署
    UML类图学习
    设计模式(五)_工厂方法模式
    设计模式(四)_简单工厂模式
    设计模式(三)_装饰器模式
  • 原文地址:https://www.cnblogs.com/clbao/p/12843971.html
Copyright © 2011-2022 走看看