zoukankan      html  css  js  c++  java
  • kombu操作RabbitMQ带优先级例子

    Kombu是一个使用AMQP协议的Python库

    安装Kombu

    pip install kombu


    Producer
    import time
    from kombu.entity import Exchange, Queue
    from kombu.messaging import Producer
    from kombu.connection import Connection
    
    with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
        with connection.channel() as channel:
            for i in range(1, 10):
                science_news = Queue(name='kombu_queue',
                                     exchange=Exchange('kombu_queue', type='direct'),
                                     routing_key='kombu_queue',
                                     channel=channel,
                                     durable=False,
                                     )
                science_news.declare()
                producer = Producer(channel, serializer='json', routing_key='kombu_queue')
                producer.publish({'name': 'kombu_queue', 'size': i})
    
                science_news = Queue(name='kombu_queue_1',
                                     exchange=Exchange('kombu_queue_1', type='direct'),
                                     routing_key='kombu_queue_1',
                                     channel=channel,
                                     max_priority=10,  # 优先级
                                     durable=False,
                                     )
                science_news.declare()
                producer = Producer(channel, serializer='json', routing_key='kombu_queue_1')
                producer.publish({'name': 'kombu_queue_1', 'size': i}, priority=i)
    consume
    from kombu.entity import Exchange, Queue
    from kombu.messaging import Consumer
    from kombu.connection import Connection
    
    
    def test_1(body, message):
        print(body)
        print("my_consume_1")
        message.ack()
    
    
    def test_2(body, message):
        print(message)
        print("my_consume_2")
    
    
    def test_3(body, message):
        print("my_consume_3")
    
    
    with Connection('amqp://guest:guest@127.0.0.1:5672/test') as connection:
        with connection.channel() as channel:
            kombu_queue = Queue(name='kombu_queue',
                                exchange=Exchange('kombu_queue', type='direct'),
                                routing_key='kombu_queue',
                                durable=False,
                                channel=channel,
                                )
            kombu_queue_1 = Queue(name='kombu_queue_1',
                                  exchange=Exchange('kombu_queue_1', type='direct'),
                                  routing_key='kombu_queue_1',
                                  durable=False,
                                  channel=channel,
                                  max_priority=10,  # 优先级
                                  )
            # 消费
            consumer = Consumer(channel,
                                queues=[kombu_queue, kombu_queue_1],  # 多个队列
                                accept=['json', 'pickle', 'msgpack', 'yaml'],  # 多种类型
                                callbacks=[test_1, test_2, test_3]  # 多个回调
                                )
            consumer.consume()
    
            while True:
                import time
    
                connection.drain_events()
  • 相关阅读:
    spring boot 若依系统整合Ueditor,部署时候上传图片错误解决
    JVM学习笔记之栈区
    据说这个是可以撸到2089年的idea2020.2
    小程序监听屏幕滑动事件
    小程序bindinput和bindblur赋值延迟问题解决
    小程序文件下载并保存文件名打开
    数据结构
    Spring JPA 自定义删改
    Spring JPA 查询创建
    Spring JPA 拓展
  • 原文地址:https://www.cnblogs.com/clbao/p/12843971.html
Copyright © 2011-2022 走看看