zoukankan      html  css  js  c++  java
  • rabbitmq消息发布



    可靠投递


    mandatory

    当交换器无法路由消息,RabbitMQ将回发Basic.Return消息到发布者,同时回发完整消息。
    Basic.Return是异步的,在消息发布后的任何时候都可能发生。
    在rabbitpy库中,客户端自动接收Basic.Return,并触发异常

    示例程序:发布失败

    import datetime
    import rabbitpy
    
    # Connect to the default URL of amqp://guest:guest@localhost:15672/%2F
    with rabbitpy.Connection() as connection:
        with connection.channel() as channel:
            # Create the message to send
            body = 'server.cpu.utilization 25.5 1350884514'
            message = rabbitpy.Message(channel,
                                       body,
                                       {'content_type': 'text/plain',
                                        'timestamp': datetime.datetime.now(),
                                        'message_type': 'graphite metric'})
    
            # Publish the message to the exchange with the routing key
            # "server-metrics" and make sure it is routed to the exchange
            message.publish('chapter2-example', 'server-metrics', mandatory=True)
    

    示例程序:异常捕获

    import datetime
    import rabbitpy
    
    # Connect to the default URL of amqp://guest:guest@localhost:15672/%2F
    connection = rabbitpy.Connection()
    try:
        with connection.channel() as channel:
            properties = {'content_type': 'text/plain',
                          'timestamp': datetime.datetime.now(),
                          'message_type': 'graphite metric'}
            body = 'server.cpu.utilization 25.5 1350884514'
            message = rabbitpy.Message(channel, body, properties)
            message.publish('chapter2-example',
                            'server-metrics',
                            mandatory=True)
    except rabbitpy.exceptions.MessageReturnedException as error:
        print('Publish failure: %s' % error)
    

    发布者确认

    发布者发送给RabbitMQ的每条消息,服务器发送一个确认(Basic.Ack)或者否认相应(Basic.Nack)。

    示例程序:发布者确认

    import rabbitpy
    
    
    with rabbitpy.Connection() as connection:
        with connection.channel() as channel:
            exchange = rabbitpy.Exchange(channel, 'chapter4-example')
            exchange.declare()
            channel.enable_publisher_confirms()
            message = rabbitpy.Message(channel,
                                       'This is an important message',
                                       {'content_type': 'text/plain',
                                        'message_type': 'very important'})
            if message.publish('chapter4-example', 'important.message'):
                print('The message was confirmed')
    

    rabbitpy中没有使用回调的方法,而是在确认收到之前会一直阻塞。

    备用交换器

    处理无法路由的消息。当不可路由的消息发布到已经定义了备用交换器的交换器中时,它将被路由到备用交换器。

    示例程序:备用交换器

    import rabbitpy
    
    with rabbitpy.Connection() as connection:
        with connection.channel() as channel:
            my_ae = rabbitpy.Exchange(channel, 'my-ae',
                                      exchange_type='fanout')
            my_ae.declare()
            args = {'alternate-exchange': my_ae.name}
            exchange = rabbitpy.Exchange(channel,
                                         'graphite',
                                         exchange_type='topic',
                                         arguments=args)
            exchange.declare()
            queue = rabbitpy.Queue(channel, 'unroutable-messages')
            queue.declare()
            if queue.bind(my_ae, '#'):
                print('Queue bound to alternate-exchange')
    

    基于事务的批量处理

    确保消息投递成功。

    import rabbitpy
    
    with rabbitpy.Connection() as connection:
        with connection.channel() as channel:
            tx = rabbitpy.Tx(channel)
            tx.select()
            message = rabbitpy.Message(channel,
                                       'This is an important message',
                                       {'content_type': 'text/plain',
                                        'delivery_mode': 2,
                                        'message_type': 'important'})
            message.publish('chapter4-example', 'important.message')
            try:
                if tx.commit():
                    print('Transaction committed')
            except rabbitpy.exceptions.NoActiveTransactionError:
                print('Tried to commit without active transaction')
    

    由于错误无法路由消息时,将及时返回Basic.Return。发布者应发送TX.RollbackRPC请求并等待来自代理服务器的TX.Rollback相应,然后继续后续的工作。

    HA队列

    允许队列在多个服务器拥有冗余副本。
    发布者发送消息到集群中的任何节点。RabbitMQ节点同步队列中消息的状态。发布的消息被放入队列,并存储在每台服务器上。

    示例代码:HA队列声明

    import rabbitpy
    
    connection = rabbitpy.Connection()
    try:
        with connection.channel() as channel:
            queue = rabbitpy.Queue(channel,
                                   'my-ha-queue',
                                   arguments={'x-ha-policy': 'all'})
            if queue.declare():
                print('Queue declared')
    except rabbitpy.exceptions.RemoteClosedChannelException as error:
        print('Queue declare failed: %s' % error)
    

    示例代码:HA队列指定节点

    import rabbitpy
    
    connection = rabbitpy.Connection()
    try:
        with connection.channel() as channel:
            arguments = {'x-ha-policy': 'nodes',
                         'x-ha-nodes': ['rabbit@node1',
                                        'rabbit@node2',
                                        'rabbit@node3']}
            queue = rabbitpy.Queue(channel,
                                   'my-2nd-ha-queue',
                                   arguments=arguments)
            if queue.declare():
                print('Queue declared')
    except rabbitpy.exceptions.RemoteClosedChannelException as error:
        print('Queue declare failed: %s' % error)
    

    消息持久化

    delivery-mode=1,保存到内存;delivery-mode=2,保存到磁盘。服务器重启后,消息仍在队列中(队列必须声明为durable)。

    如果rabbitmq经常等待操作系统响应读写请求,消息吞吐量将大大降低。

    rabbitmq回推

    发布者发布消息太快,RabbitMQ发送Channel.Flow RPC方法,阻塞发布者。发布者不能发送消息直到收到另一条Channel.Flow命令。

    示例代码:检测连接状态

    import rabbitpy
    
    connection = rabbitpy.Connection()
    print('Channel is Blocked? %s' % connection.blocked)
    
    
  • 相关阅读:
    Kali之Metasploit生成apk后门控制安卓
    迅雷后台上传?干掉迅雷后台进程和服务的一个批处理
    不用第三方软件–一键开关笔记本电脑wifi热点的批处理
    【PHP】创蓝253云通讯国际短信余额查询请求demo
    【PHP】创蓝253云通讯平台国际短信API接口demo
    创蓝253云通讯平台---短信验证码接口说明
    C++调取国际短信验证码----创蓝253云通讯平台---demo
    手机空号、停机、注销,空号检测为你去除无效号码
    上市公司都被撸垮,羊毛党就真的没有办法解决了吗?
    如何用Ruby调取创蓝253短信验证码
  • 原文地址:https://www.cnblogs.com/hainingwyx/p/13811597.html
Copyright © 2011-2022 走看看