zoukankan      html  css  js  c++  java
  • RabbitMQ如何保证发送端消息的可靠投递

    消息发布者向RabbitMQ进行消息投递时默认情况下是不返回发布者该条消息在broker中的状态的,也就是说发布者不知道这条消息是否真的抵达RabbitMQ的broker之上,也因此会发生消息丢失的情况。

    对此,RabbitmQ提供了两种解决方案(以官方提供的SDK为例)

    1.通过AMOP提供的事务机制:

    C#代码:

                    try
                    {
                        channel.TxSelect();
                        channel.BasicPublish("yu.exchange", "yu.1", props, msg);
                        channel.TxCommit();
                    }
                    catch (Exception ex)
                    {
                        channel.TxRollback();
                    }

    java代码是一样的操作。。。

            byte[] msg = "hello".getBytes();
            try {
                channel.txSelect();
                channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
                channel.txCommit();
            } catch (Exception ex) {
                channel.txRollback();
            }

    事务开启,提交,回滚都有了。。。

    2.Conform模式,也就是官网推荐的消息批量提交的方式

    Conform模式主要包含两种编程模式,一种同步的,一种异步通知的:

    同步回调的调用方式与事务模式差不多

    C#代码:

                    channel.ConfirmSelect();
                    byte[] msg = Encoding.UTF8.GetBytes("hello");
                    channel.BasicPublish("yu.exchange1", "yu.1", props, msg);
                    bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 10));
                    Console.WriteLine(success);

    Java代码:

            byte[] msg = "hello".getBytes();
            channel.confirmSelect();
            channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
            boolean success = channel.waitForConfirms(10);

    通道Channel开启Conform,在发送完消息后可以通过WaitForConfirm等待消息的投递结果,这里有个可选参数,就是阻塞等待的时间,如果返回结果为false则表示消息投递失败,则发送端这时候也就可以采取重试之类的策略了。

    异步回调的方式也就是通道订阅RabbitMQ的发送完毕确认事件,消息投递成功会回调这个方法给发送方,回调的参数包含当前消息在该通道中发送的编号DeliveryTag(批量提交的时候可以根据这个编号对应提交集合的索引,保证对应集合索引上的消息可靠投递),的最大值是9223372036854775807

    C#代码:

               channel.BasicAcks += (sender, eventArgs) =>
                    {
                        ulong tag = eventArgs.DeliveryTag;
                    };
                    channel.BasicReturn += (sender, eventArgs) =>
                    {
                       
                    };
                    byte[] msg = Encoding.UTF8.GetBytes("hello");
                    channel.BasicPublish("yu.exchange1", "yu.1", true, props, msg);

    Java代码:

     channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println(l);
                }
    
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println(l);
                }
            });
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                    System.out.println("响应状态码-ReplyCode:"+i);
                    System.out.println("响应内容-ReplyText:"+s);
                    System.out.println("Exchange:"+s1);
                    System.out.println("RouteKey"+s2);
    System.out.println("投递失败的消息:"+ new String(bytes,"UTF-8") );
                }
            });
            byte[] msg = "hello".getBytes();
            channel.confirmSelect();
            channel.basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);

    代码中多订阅了一个BasicReturn事件(addReturnListener),当消息被RabbitMQ拒绝或者说没有成功投递的时候,则会触发这个方法,当然想要获取详细信息则需要设置mandatory参数为true,也就是basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);中的第三个参数。

    队列中新建一个yu.exchange1的交换机然后不绑定队列的情况下则会投递失败的时间;

  • 相关阅读:
    学生分数排序
    union 的一个简单例子,搜狗笔试题
    枚举的初始化赋值
    unsigned 赋值负数输出情况 & printf输出格式
    sizeof中的表达式不执行
    三级指针:百度试题
    生成器案例,#采集日志
    简述装饰器
    装饰器(转载)
    python写监控并发警报邮件
  • 原文地址:https://www.cnblogs.com/ylsforever/p/7773404.html
Copyright © 2011-2022 走看看