zoukankan      html  css  js  c++  java
  • rabbitmq3.7.3 发布了一个新的 exchange x-random

    direct exchange 同一个 routing key 可以绑定多个 queue,当给这个routing key发消息时,所有 queue 都会投递。这个行为对于一些场景不适用,有时我们希望只有一个 queue 收到消息。x-random 就是解决这个问题的。

    这个 exchange 之前是第三方实现的,现在在官方给的 3.7.3(2018-01-30发布) 已经自带了,但仍需要人工激活。升级 rabbitmq-server 后手工执行:

    rabbitmq-plugins enable rabbitmq_random_exchange

    激活后在 web console 就可以看到这种 exchange 了:

    测试:

    import asyncio
    import traceback
    
    import asynqp
    
    async def test(case_id):
    
        def listen(id):
            def callback(msg):
                print('[{}] [{}] Received: {}'.format(case_id, id, msg.body))
                msg.ack()
            return callback
    
        try:
            connection = await asynqp.connect('localhost', 5672, username='guest', password='guest')
            channel = await connection.open_channel()
            exchange = await channel.declare_exchange('rnd.exchange', 'x-random')
    
            for i in range(0,10):
                queue = await channel.declare_queue('test.queue_%s' % i)
                await queue.bind(exchange, 'routing.key')
                await queue.consume(listen(i))
    
            async def send_msg():
                for i in range(0, 50):
                    msg = asynqp.Message({'hello': '%s world %s, random exchange'%(case_id, i)})
                    exchange.publish(msg, 'test.queue')
                    await asyncio.sleep(1)
    
            asyncio.ensure_future(send_msg())
    
        except Exception as ex:
            traceback.print_exc()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait([test('A'), asyncio.sleep(10)]))

    效果:

    [A] [5] Received: b'{"hello": "A world 0, random exchange"}'
    [A] [6] Received: b'{"hello": "A world 1, random exchange"}'
    [A] [1] Received: b'{"hello": "A world 2, random exchange"}'
    [A] [8] Received: b'{"hello": "A world 3, random exchange"}'
    [A] [3] Received: b'{"hello": "A world 4, random exchange"}'
    [A] [4] Received: b'{"hello": "A world 5, random exchange"}'
    [A] [6] Received: b'{"hello": "A world 6, random exchange"}'
    [A] [8] Received: b'{"hello": "A world 7, random exchange"}'
    [A] [5] Received: b'{"hello": "A world 8, random exchange"}'
    [A] [2] Received: b'{"hello": "A world 9, random exchange"}'

    可见消息会投递到随机选择的某个 queue。

    random-exchange 的代码:https://github.com/rabbitmq/rabbitmq-random-exchange

    注意:publish 提供的 routing key 并没有处理,不论提供什么 routing key  都会对所有绑定到该 exchange 的 queue 推送消息,所以效果相当于 fanout + random。不实用。

    这个

  • 相关阅读:
    Python之路【第十六篇】Django基础
    Python之路【第十五篇】WEB框架
    Python之路【第十四篇】前端补充回顾
    Python之路【第十三篇】jQuery案例-Form表单&插件及扩展
    Python之路【第十二篇续】jQuery案例详解
    Kafka【第一篇】Kafka集群搭建
    Python之路【第十二篇】前端之js&dome&jQuery
    Python之路【第十一篇续】前端之CSS补充
    Python之路【第十一篇续】前端初识之CSS
    Python之路【第十一篇】前端初识之HTML
  • 原文地址:https://www.cnblogs.com/inshua/p/8417021.html
Copyright © 2011-2022 走看看