zoukankan      html  css  js  c++  java
  • RabbitMQ消息发布时的权衡

    在进行本篇文章的学习之前,你需要先阅读 https://www.cnblogs.com/duanjt/p/10057330.html。以便对Java访问RabbitMQ的基础用法有所了解。

    一、失败通知

    基于前面的讲解,如果消息通过交换器发送到指定的路由键,而这个路由键却没有被队列绑定,那么这条消息就会被丢弃。从这个角度来说消息的可靠性就比较低。为了增强可靠性,于是引入了失败通知的机制。在生产者发送消息到RabbitMQ的时候,如果路由键没有被队列绑定就将回调一个函数,让消费者能够知晓数据发送失败,从而做一些处理(注意:发送成功不会回调)。
    具体代码如下:

    public class Productor {
        public static String EXCHANGE_NAME = "zd_exchange";
        public static String ROUTEKEY = "zd_data_list1";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.23.88.116");
            factory.setPort(5672);
            factory.setUsername("zhangxueliang");
            factory.setPassword("zhangxueliang");
            factory.setVirtualHost("/");
    
            Connection conn = factory.newConnection();
            Channel channel = conn.createChannel();
    
            // 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            // 创建失败通知
            channel.addReturnListener(new ReturnListener() {
    
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    System.out.println("ERROR DATA:" + new String(body));
                    System.out.println("replyText:" + replyText);
                    System.out.println("replyCode:" + replyCode);
                    System.out.println("================");
                }
            });
    
            // 发送数据
            for (int i = 0; i < 3; i++) {
                String msg = "Hello world " + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTEKEY, true, null, msg.getBytes());
                System.out.println("send:" + msg);
            }
    
            Thread.sleep(2000);//暂停2秒,以便接收回调。因为通道关闭后就不能接收到回调消息了
    
            channel.close();
            conn.close();
        }
    }

    注意:

    1.我们需要在发送消息的时候指定mandatory=true(channel.basicPublish的第三个参数)
    2.我们需要通过channel增加一个返回监听,replyText表示错误原因,body表示发送的消息内容。

    二、事务


    1.channel.txSelect()声明启动事务模式;
    2.channel.txComment()提交事务;
    3.channel.txRollback()回滚事务;
    说明:由于事务会严重影响效率,所以我们一般不使用,而是用发送方确认模式来保证数据的可靠性。

    三、发送方确认模式


    发送方确认模式和事务还是有一定的区别
    比如发送10条数据--
    事务能保证在第5条发送错误的时候就回滚,RabbitMQ里面没有一条数据。
    而发送者确认模式在第5条发送错误的时候会抛出一个异常,生产者就能抓住这个异常从而得知没有全部成功,但是已经发送的数据却已经存在于RabbitMQ了。
    发送方确认有3种方式。2种同步,1种异步
    方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回true。
    方式二:channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会抛出IOException异常。
    方式三:channel.addConfirmListener()异步监听发送方确认模式
    【注意】必须要先启动发送者确认模式channel.confirmSelect();
    具体代码:

    public class Productor {
        public static String EXCHANGE_NAME = "zd_exchange";
        public static String ROUTEKEY = "zd_data_list";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.23.88.116");
            factory.setPort(5672);
            factory.setUsername("zhangxueliang");
            factory.setPassword("zhangxueliang");
            factory.setVirtualHost("/");
    
            Connection conn = factory.newConnection();
            Channel channel = conn.createChannel();
    
            // 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            // 创建失败通知
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    System.out.println("handleReturn==>" + replyText);
                }
            });
    
            // 异步确认
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("handleNack==>" + deliveryTag + "----" + multiple);
                }
    
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("handleAck==>" + deliveryTag + "----" + multiple);
                }
            });
    
            // 启用发送确认模式
            channel.confirmSelect();
    
            // 发送数据
            String[] routekeys = new String[] { "zd_data_list", "zd_data_list2", "zd_data_list3" };
            for (int i = 0; i < 3; i++) {
                String msg = "Hello world " + i;
                channel.basicPublish(EXCHANGE_NAME, routekeys[i % 3], true, null, msg.getBytes());
                System.out.println("send:" + msg);
    
                // 同步单条确认
                //channel.waitForConfirms();
            }
            // 同步批量确认
            // channel.waitForConfirmsOrDie();
    
            Thread.sleep(2000);// 暂停2秒,以便接收回调。因为通道关闭后就不能接收到回调消息了
    
            channel.close();
            conn.close();
        }
    }

    注意:

    1.发送者确认模式只是告诉生产者是否发送成功,至于成功和失败后的具体处理还是代码自己实现。
    2.同步单条确认channel.waitForConfirms()会返回是否成功。同步批量确认channel.waitForConfirmsOrDie()可通过异常来判断是否成功。
    3.异步确认channel.addConfirmListener()。有两个参数(long deliveryTag, boolean multiple)。第一个参数表示消息的Id,第二个参数表示是否为批量。如果第二个参数为true,那么编号<=deliveryTag的所有消息都确认了。
    4.发送者确认模式和失败通知可以一起使用。无论消息的路由键是否有队列绑定,都会返回Ack表示成功。而如果没有队列绑定路由键,同时又启用了失败通知,那么还会在调用channel.addConfirmListener()之前去调用channel.addReturnListener()

    四、备用交换器


    备用交换器从本质上说也就是一个普通交换器。当消息发送到普通交换器而没有队列绑定路由键的时候。该消息就会发送到备用交换器上面。如果备用交换器上面也没得队列绑定,那么这条消息也将被丢弃(如果配置了失败通知,那么将回调失败通知方法channel.addReturnListener())
    核心代码:

    // 创建主交换器
    Map<String, Object> argsMap = new HashMap<String, Object>();
    argsMap.put("alternate-exchange", BACK_EXCHANGE_NAME);// 定义主交换器的备用交换器名称
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, argsMap);
    // 创建备用交换器
    channel.exchangeDeclare(BACK_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  • 相关阅读:
    .net程序调用检测和性能分析工具——DotTrace
    HR系统邮件审批功能总结
    添加AD验证(域身份验证)到现有网站
    【事务】:数据库事务隔离级别、脏读、幻读、不可重复读
    【TensorFlow】:解决TensorFlow的ImportError: DLL load failed: 动态链接库(DLL)初始化例程失败
    【Anaconda】:科学计算的Python发行版
    【Junit4】:要点随笔
    【ElasticSearch】:elasticsearch.yml配置
    【ElasticSearch】:Windows下ElasticSearch+版本安装head
    ArrayList、Vector、HashMap、HashTable、HashSet的默认初始容量、加载因子、扩容增量
  • 原文地址:https://www.cnblogs.com/duanjt/p/10075308.html
Copyright © 2011-2022 走看看