zoukankan      html  css  js  c++  java
  • RabbitMQ如何保证发送端消息的可靠投递

    消息发布者向RabbitMQ进行消息投递时默认情况下是不返回发布者该条消息在broker中的状态的,也就是说发布者不知道这条消息是否真的抵达RabbitMQ的broker之上,也因此会发生消息丢失的情况。

    对此,RabbitmQ提供了两种解决方案(以官方提供的SDK为例)

    1.通过AMOP提供的事务机制:

    C#代码:

                    try
                    {
                        channel.TxSelect();
                        channel.BasicPublish("yu.exchange", "yu.1", props, msg);
                        channel.TxCommit();
                    }
                    catch (Exception ex)
                    {
                        channel.TxRollback();
                    }

    java代码是一样的操作。。。

            byte[] msg = "hello".getBytes();
            try {
                channel.txSelect();
                channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
                channel.txCommit();
            } catch (Exception ex) {
                channel.txRollback();
            }

    事务开启,提交,回滚都有了。。。

    2.Conform模式,也就是官网推荐的消息批量提交的方式

    Conform模式主要包含两种编程模式,一种同步的,一种异步通知的:

    同步回调的调用方式与事务模式差不多

    C#代码:

                    channel.ConfirmSelect();
                    byte[] msg = Encoding.UTF8.GetBytes("hello");
                    channel.BasicPublish("yu.exchange1", "yu.1", props, msg);
                    bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 10));
                    Console.WriteLine(success);

    Java代码:

            byte[] msg = "hello".getBytes();
            channel.confirmSelect();
            channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
            boolean success = channel.waitForConfirms(10);

    通道Channel开启Conform,在发送完消息后可以通过WaitForConfirm等待消息的投递结果,这里有个可选参数,就是阻塞等待的时间,如果返回结果为false则表示消息投递失败,则发送端这时候也就可以采取重试之类的策略了。

    异步回调的方式也就是通道订阅RabbitMQ的发送完毕确认事件,消息投递成功会回调这个方法给发送方,回调的参数包含当前消息在该通道中发送的编号DeliveryTag(批量提交的时候可以根据这个编号对应提交集合的索引,保证对应集合索引上的消息可靠投递),的最大值是9223372036854775807

    C#代码:

               channel.BasicAcks += (sender, eventArgs) =>
                    {
                        ulong tag = eventArgs.DeliveryTag;
                    };
                    channel.BasicReturn += (sender, eventArgs) =>
                    {
                       
                    };
                    byte[] msg = Encoding.UTF8.GetBytes("hello");
                    channel.BasicPublish("yu.exchange1", "yu.1", true, props, msg);

    Java代码:

     channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println(l);
                }
    
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println(l);
                }
            });
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                    System.out.println("响应状态码-ReplyCode:"+i);
                    System.out.println("响应内容-ReplyText:"+s);
                    System.out.println("Exchange:"+s1);
                    System.out.println("RouteKey"+s2);
    System.out.println("投递失败的消息:"+ new String(bytes,"UTF-8") );
                }
            });
            byte[] msg = "hello".getBytes();
            channel.confirmSelect();
            channel.basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);

    代码中多订阅了一个BasicReturn事件(addReturnListener),当消息被RabbitMQ拒绝或者说没有成功投递的时候,则会触发这个方法,当然想要获取详细信息则需要设置mandatory参数为true,也就是basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);中的第三个参数。

    队列中新建一个yu.exchange1的交换机然后不绑定队列的情况下则会投递失败的时间;

  • 相关阅读:
    使用 libevent 和 libev 提高网络应用性能
    An existing connection was forcibly closed by the remote host
    各种浏览器的兼容css
    vs输出窗口,显示build的时间
    sass
    网站设置404错误页
    List of content management systems
    css footer not displaying at the bottom of the page
    强制刷新css
    sp_executesql invalid object name
  • 原文地址:https://www.cnblogs.com/ylsforever/p/7773404.html
Copyright © 2011-2022 走看看