zoukankan      html  css  js  c++  java
  • RabbitMQ消息发布时的权衡

    在进行本篇文章的学习之前,你需要先阅读 https://www.cnblogs.com/duanjt/p/10057330.html。以便对Java访问RabbitMQ的基础用法有所了解。

    一、失败通知

    基于前面的讲解,如果消息通过交换器发送到指定的路由键,而这个路由键却没有被队列绑定,那么这条消息就会被丢弃。从这个角度来说消息的可靠性就比较低。为了增强可靠性,于是引入了失败通知的机制。在生产者发送消息到RabbitMQ的时候,如果路由键没有被队列绑定就将回调一个函数,让消费者能够知晓数据发送失败,从而做一些处理(注意:发送成功不会回调)。
    具体代码如下:

    public class Productor {
        public static String EXCHANGE_NAME = "zd_exchange";
        public static String ROUTEKEY = "zd_data_list1";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.23.88.116");
            factory.setPort(5672);
            factory.setUsername("zhangxueliang");
            factory.setPassword("zhangxueliang");
            factory.setVirtualHost("/");
    
            Connection conn = factory.newConnection();
            Channel channel = conn.createChannel();
    
            // 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            // 创建失败通知
            channel.addReturnListener(new ReturnListener() {
    
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    System.out.println("ERROR DATA:" + new String(body));
                    System.out.println("replyText:" + replyText);
                    System.out.println("replyCode:" + replyCode);
                    System.out.println("================");
                }
            });
    
            // 发送数据
            for (int i = 0; i < 3; i++) {
                String msg = "Hello world " + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTEKEY, true, null, msg.getBytes());
                System.out.println("send:" + msg);
            }
    
            Thread.sleep(2000);//暂停2秒,以便接收回调。因为通道关闭后就不能接收到回调消息了
    
            channel.close();
            conn.close();
        }
    }

    注意:

    1.我们需要在发送消息的时候指定mandatory=true(channel.basicPublish的第三个参数)
    2.我们需要通过channel增加一个返回监听,replyText表示错误原因,body表示发送的消息内容。

    二、事务


    1.channel.txSelect()声明启动事务模式;
    2.channel.txComment()提交事务;
    3.channel.txRollback()回滚事务;
    说明:由于事务会严重影响效率,所以我们一般不使用,而是用发送方确认模式来保证数据的可靠性。

    三、发送方确认模式


    发送方确认模式和事务还是有一定的区别
    比如发送10条数据--
    事务能保证在第5条发送错误的时候就回滚,RabbitMQ里面没有一条数据。
    而发送者确认模式在第5条发送错误的时候会抛出一个异常,生产者就能抓住这个异常从而得知没有全部成功,但是已经发送的数据却已经存在于RabbitMQ了。
    发送方确认有3种方式。2种同步,1种异步
    方式一:channel.waitForConfirms()普通发送方确认模式;消息到达交换器,就会返回true。
    方式二:channel.waitForConfirmsOrDie()批量确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会抛出IOException异常。
    方式三:channel.addConfirmListener()异步监听发送方确认模式
    【注意】必须要先启动发送者确认模式channel.confirmSelect();
    具体代码:

    public class Productor {
        public static String EXCHANGE_NAME = "zd_exchange";
        public static String ROUTEKEY = "zd_data_list";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.23.88.116");
            factory.setPort(5672);
            factory.setUsername("zhangxueliang");
            factory.setPassword("zhangxueliang");
            factory.setVirtualHost("/");
    
            Connection conn = factory.newConnection();
            Channel channel = conn.createChannel();
    
            // 创建交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            // 创建失败通知
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    System.out.println("handleReturn==>" + replyText);
                }
            });
    
            // 异步确认
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("handleNack==>" + deliveryTag + "----" + multiple);
                }
    
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("handleAck==>" + deliveryTag + "----" + multiple);
                }
            });
    
            // 启用发送确认模式
            channel.confirmSelect();
    
            // 发送数据
            String[] routekeys = new String[] { "zd_data_list", "zd_data_list2", "zd_data_list3" };
            for (int i = 0; i < 3; i++) {
                String msg = "Hello world " + i;
                channel.basicPublish(EXCHANGE_NAME, routekeys[i % 3], true, null, msg.getBytes());
                System.out.println("send:" + msg);
    
                // 同步单条确认
                //channel.waitForConfirms();
            }
            // 同步批量确认
            // channel.waitForConfirmsOrDie();
    
            Thread.sleep(2000);// 暂停2秒,以便接收回调。因为通道关闭后就不能接收到回调消息了
    
            channel.close();
            conn.close();
        }
    }

    注意:

    1.发送者确认模式只是告诉生产者是否发送成功,至于成功和失败后的具体处理还是代码自己实现。
    2.同步单条确认channel.waitForConfirms()会返回是否成功。同步批量确认channel.waitForConfirmsOrDie()可通过异常来判断是否成功。
    3.异步确认channel.addConfirmListener()。有两个参数(long deliveryTag, boolean multiple)。第一个参数表示消息的Id,第二个参数表示是否为批量。如果第二个参数为true,那么编号<=deliveryTag的所有消息都确认了。
    4.发送者确认模式和失败通知可以一起使用。无论消息的路由键是否有队列绑定,都会返回Ack表示成功。而如果没有队列绑定路由键,同时又启用了失败通知,那么还会在调用channel.addConfirmListener()之前去调用channel.addReturnListener()

    四、备用交换器


    备用交换器从本质上说也就是一个普通交换器。当消息发送到普通交换器而没有队列绑定路由键的时候。该消息就会发送到备用交换器上面。如果备用交换器上面也没得队列绑定,那么这条消息也将被丢弃(如果配置了失败通知,那么将回调失败通知方法channel.addReturnListener())
    核心代码:

    // 创建主交换器
    Map<String, Object> argsMap = new HashMap<String, Object>();
    argsMap.put("alternate-exchange", BACK_EXCHANGE_NAME);// 定义主交换器的备用交换器名称
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, argsMap);
    // 创建备用交换器
    channel.exchangeDeclare(BACK_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  • 相关阅读:
    古谚、评论与论断、名篇与名言
    重读《西游记》
    重读《西游记》
    命名之法 —— 时间、季节、地点
    命名之法 —— 时间、季节、地点
    文言的理解 —— 古时的称谓、别称、别名
    文言的理解 —— 古时的称谓、别称、别名
    Oracle GoldenGate for Oracle 11g to PostgreSQL 9.2.4 Configuration
    瀑布 敏捷 文档
    POJ 1325 ZOJ 1364 最小覆盖点集
  • 原文地址:https://www.cnblogs.com/duanjt/p/10075308.html
Copyright © 2011-2022 走看看