zoukankan      html  css  js  c++  java
  • RabbitMQ事务性消息和确认模式

    事务消息与数据库的事务类似,只是MQ的消息是要保证消息是否会全部发送成功,防止消息丢失的一种策略。

    RabbitMQ有两种策略来解决这个问题:

    1.通过AMQP的事务机制实现

    2.使用发送者确认模式实现

    1.事务

    事务的实现主要是对信道(Channel)的设置,主要方法如下:

    1. channel.txSelect()  声明启动事务模式

    2.channel.txCommit() 提交事务

    3.channel.txRollback()回滚事务

    1.事务性消息发送

    开启事务之后必须手动channel.txCommit();提交或者channel.txRollback();回滚。

    package rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
        
        public static Connection getConnection() throws Exception {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory.newConnection();
        }
    
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                connection = getConnection();
                channel = connection.createChannel();
                /**
                 * 声明一个队列。
                 * 参数一:队列名称
                 * 参数二:是否持久化
                 * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
                 * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
                 * 参数五:队列的附加属性
                 * 注意:
                 * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
                 * 2.队列名可以任意取值,但需要与消息接收者一致。
                 * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
                 */
                channel.queueDeclare("myQueue", true, false, false,null);
                
                // 启动事务,必须用txCommit()或者txRollback()回滚
                channel.txSelect();
    
                // 假设这里处理业务逻辑
                String message = "hello,message!";
                /**
                 * 发送消息到MQ
                 * 参数一:交换机名称,为""表示不用交换机
                 * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
                 * 参数三:消息的属性信息
                 * 参数四:消息内容的字节数组
                 */
                channel.basicPublish("", "myQueue", null, message.getBytes());
                
                // 提交事务
                channel.txCommit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (channel != null) {
                        // 回滚。如果未异常会提交事务,此时回滚无影响
                        channel.txRollback();
                        channel.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Exception ignore) {
                    // ignore
                }
            }
        }
    }

      测试可以注释掉提交事务的代码发现mq不会新增消息。

    2.消费者事务测试

      经测试,自动确认模式下。即使事务不提交,也会读取到消息并从队列移除。也就是暂时得出的结论是事务对消费者无效。

    package rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Consumer {
    
        public static ConnectionFactory getConnectionFactory() {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory;
        }
    
        public static void main(String[] args) throws IOException, TimeoutException  {
            ConnectionFactory connectionFactory = getConnectionFactory();
            Connection newConnection = null;
            Channel createChannel = null;
            try {
                newConnection = connectionFactory.newConnection();
                createChannel = newConnection.createChannel();
                /**
                 * 声明一个队列。
                 * 参数一:队列名称
                 * 参数二:是否持久化
                 * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
                 * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
                 * 参数五:队列的附加属性
                 * 注意:
                 * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
                 * 2.队列名可以任意取值,但需要与消息接收者一致。
                 * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
                 */
                createChannel.queueDeclare("myQueue", true, false, false,null);
                
                /**
                 * 开启事务
                 * 消费者开启事务后,即使不提交也会获取到消息并且从队列删除。
                 * 结论:
                 *     事务对消费者没有任何影响
                 */
                createChannel.txSelect();
                
                /**
                 * 接收消息。会持续坚挺,不能关闭channel和Connection
                 * 参数一:队列名称
                 * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
                 * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。
                 * 参数四:消息接收者
                 */
                createChannel.basicConsume("myQueue", true, "", new DefaultConsumer(createChannel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String string = new String(body, "UTF-8");
                        System.out.println("接收到d消息: -》 " + string);
                    }
                });
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
            }
            
        }
    }

      上面是自动确认模式的消费者,不受事务的影响。

    如果是手动确认消息的消费者,在开启事务下,必须手动commit,否则不会移除消息。

    如下:手动确认模式+事务的用法

    package rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class Consumer {
    
        public static ConnectionFactory getConnectionFactory() {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory;
        }
    
        public static void main(String[] args) throws IOException, TimeoutException  {
            ConnectionFactory connectionFactory = getConnectionFactory();
            Connection newConnection = null;
            Channel createChannel = null;
            try {
                newConnection = connectionFactory.newConnection();
                createChannel = newConnection.createChannel();
                /**
                 * 声明一个队列。
                 * 参数一:队列名称
                 * 参数二:是否持久化
                 * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
                 * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
                 * 参数五:队列的附加属性
                 * 注意:
                 * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
                 * 2.队列名可以任意取值,但需要与消息接收者一致。
                 * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
                 */
                createChannel.queueDeclare("myQueue", true, false, false,null);
                
                /**
                 * 开启事务
                 * 消费者开启事务后,即使不提交也会获取到消息并且从队列删除。
                 * 结论:
                 *     如果是手动确认的消费者,开启事物的情况下必须ack之后再commit,否则不会从队列移除
                 */
                createChannel.txSelect();
                
                /**
                 * 接收消息。会持续坚挺,不能关闭channel和Connection
                 * 参数一:队列名称
                 * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
                 * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。
                 * 参数四:消息接收者
                 */
                createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
    
                        // 该消息是否已经被处理过,true表示已经处理过,false表示没有处理过
                        boolean redeliver = envelope.isRedeliver();
                        
                        String string = new String(body, "UTF-8");
                        // 获取消息的编号,根据编号确认消息
                        long deliveryTag = envelope.getDeliveryTag();
                        // 获取当前内部类中的通道
                        Channel channel = this.getChannel();
                        System.out.println("处理消息成功, 消息: " + string + "	 redeliver: " + redeliver);
                        
                        // 手动确认
                        channel.basicAck(deliveryTag, true);
                        
                        // 提交事务
                        channel.txCommit();
                    }
                });
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
            }
            
        }
    }

    这里envelope.isRedeliver() 可以返回该消息是否已经被处理过。

    2. 确认模式

    Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。最终确保所有的消息全部发送成功。confirm确认模式要比事务快。

    Confirm的三种实现方式:

    方式一:channel.waitForConfirms()普通发送方确认模式;

    方式二:channel.waitForConfirmsOrDie()批量确认模式;

    方式三:channel.addConfirmListener()异步监听发送方确认模式;

    1. 普通发送方确认模式

    package rabbitmq;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
        
        public static Connection getConnection() throws Exception {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory.newConnection();
        }
    
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                try {
                    connection = getConnection();
                } catch (Exception e) {
                    // ignore
                }
                channel = connection.createChannel();
                channel.queueDeclare("myQueue", true, false, false,null);
                
                // 启动发送者确认模式
                channel.confirmSelect();
        
                String message = "hello,message! confirmSelect";
                /**
                 * 发送消息到MQ
                 * 参数一:交换机名称,为""表示不用交换机
                 * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
                 * 参数三:消息的属性信息
                 * 参数四:消息内容的字节数组
                 */
                channel.basicPublish("", "myQueue", null, message.getBytes());
                
                // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间,发送成功返回true,否则返回false
                if (channel.waitForConfirms()) {
                    System.out.print("发送成功");
                } else {
                    // 返回false可以进行补发。重试几次发送或者利用redis+定时任务来完成补发
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                // channel.waitForConfirms 可能返回超时异常
                // 可以进行补发。重试几次发送或者利用redis+定时任务来完成补发
            } finally {
                try {
                    if (channel != null) {
                        channel.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Exception ignore) {
                    // ignore
                }
            }
        }
    }

      这里需要用confirmSelect() 开启确认模式,然后channel.waitForConfirms() 阻塞等待发送。返回false或者抛出InterruptedException中断异常都是发送失败。可以进行补发,可以用重试机制或者先存到redis,随后用定时任务发送。

    2.批量确认模式

      这种用于确认一大批消息模式。但是一旦消息集合有一个没发送成功就会全部失败,需要全部进行补发。

    package rabbitmq;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
        
        public static Connection getConnection() throws Exception {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory.newConnection();
        }
    
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                try {
                    connection = getConnection();
                } catch (Exception e) {
                    // ignore
                }
                channel = connection.createChannel();
                channel.queueDeclare("myQueue", true, false, false,null);
                
                // 启动发送者确认模式
                channel.confirmSelect();
        
                String message = "hello,message! confirmSelect";
                /**
                 * 发送消息到MQ
                 * 参数一:交换机名称,为""表示不用交换机
                 * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
                 * 参数三:消息的属性信息
                 * 参数四:消息内容的字节数组
                 */
                channel.basicPublish("", "myQueue", null, message.getBytes());
                channel.basicPublish("", "myQueue", null, message.getBytes());
                channel.basicPublish("", "myQueue", null, message.getBytes());
                channel.basicPublish("", "myQueue", null, message.getBytes());
                
                try {
                    // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。
                    channel.waitForConfirmsOrDie();
                } catch (InterruptedException e) {
                    // 可以进行补发。重试几次发送或者利用redis+定时任务来完成补发
                } catch (IOException e) {
                    // 可以进行补发。重试几次发送或者利用redis+定时任务来完成补发                
                }
                System.out.print("发送成功");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (channel != null) {
                        channel.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Exception ignore) {
                    // ignore
                }
            }
        }
    }

      这种模式方法无返回值,只能根据异常进行判断。如果确认失败会抛出IOException和InterruptedException。源码如下:

    void waitForConfirmsOrDie() throws IOException, InterruptedException;

    3.异步Confirm模式

      异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。需要注意的是不可以关闭channel和connection。

    package rabbitmq;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
        
        public static Connection getConnection() throws Exception {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory.newConnection();
        }
    
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                try {
                    connection = getConnection();
                } catch (Exception e) {
                    // ignore
                }
                channel = connection.createChannel();
                channel.queueDeclare("myQueue", true, false, false,null);
                
                // 启动发送者确认模式
                channel.confirmSelect();
                
                /**
                 * 发送消息到MQ
                 * 参数一:交换机名称,为""表示不用交换机
                 * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
                 * 参数三:消息的属性信息
                 * 参数四:消息内容的字节数组
                 */
                for (int i = 0; i< 500; i ++) {
                    String message = "hello,message! confirmSelect " + i;
                    channel.basicPublish("", "myQueue", null, message.getBytes());
                }
                
                //异步监听确认和未确认的消息
                channel.addConfirmListener(new ConfirmListener() {
                    /**
                     * 消息没有确认的回调方法
                     * 参数一:没有确认的消息的编号
                     * 参数二: 是否没有确认多个
                     */
                    @Override
                    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                        System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));
                    }
                    
                    /**
                     * 消息确认后回调
                     * 参数一: 确认的消息的编号,从1开始递增
                     * 参数二: 当前消息是否同时确认了多个
                     */
                    @Override
                    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                        System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
    //            try {
    //                if (channel != null) {
    //                    channel.close();
    //                }
    //                if (connection != null) {
    //                    connection.close();
    //                }
    //            } catch (Exception ignore) {
                    // ignore
    //            }
            }
        }
    }

    3.消费者确认模式

      为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(Message Acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,rabbitMQ会等待消费者显式发回ack信号后从内存(和磁盘,如果是持久化消息)中删除消息。这里需要注意。如果一个消息设置了手动确认,就必须应答或者拒绝,否则会一直阻塞。

    手动确认主要使用一些方法:

    basicAck(long deliveryTag, boolean multiple):用于肯定确认,multiple参数用于确认多个消息。确认后从队列删除消息。

    basicRecover(bool) :消息重回队列。参数为true表示尽可能的将消息投递给其他消费者消费,而不是自己再次消费。false则表示在睡眠5s后消息重新投递给自己。

    basicReject(long deliveryTag, boolean requeue):接收端告诉服务器这个消息我拒绝接受,可以设置是否回到队列中还是丢弃。true则重新入队列,该消费者还是会消费到该条被reject的消息。false表示丢弃或者进入死信队列。

    basicNack(long deliveryTag, boolean multiple, boolean requeue):可以一次拒绝N条消息,客户端可以设置basicNack()的multiple参数为true。与basicReject()的区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

    例如:

    生产者:

    package rabbitmq;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
        
        public static Connection getConnection() throws Exception {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory.newConnection();
        }
    
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                try {
                    connection = getConnection();
                } catch (Exception e) {
                    // ignore
                }
                channel = connection.createChannel();
                channel.queueDeclare("myQueue", true, false, false,null);
                
                // 启动发送者确认模式
                channel.confirmSelect();
                
                /**
                 * 发送消息到MQ
                 * 参数一:交换机名称,为""表示不用交换机
                 * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
                 * 参数三:消息的属性信息
                 * 参数四:消息内容的字节数组
                 */
                String message = "hello,message! confirmSelect 1";
                channel.basicPublish("", "myQueue", null, message.getBytes());
                
                //异步监听确认和未确认的消息
                channel.waitForConfirms();
            } catch (Exception e) {
                // ignore
            } finally {
                try {
                    if (channel != null) {
                        channel.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Exception ignore) {
                     // ignore
                }
            }
        }
    }

    (1) 手动应答basicAck,源码如下:

        /**
         * Acknowledge one or several received
         * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
         * containing the received message being acknowledged.
         * @see com.rabbitmq.client.AMQP.Basic.Ack
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param multiple true to acknowledge all messages up to and
         * including the supplied delivery tag; false to acknowledge just
         * the supplied delivery tag.
         * @throws java.io.IOException if an error is encountered
         */
        void basicAck(long deliveryTag, boolean multiple) throws IOException;

    测试代码:

    package rabbitmq;
    
    import java.io.IOException;
    import java.util.Date;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class Consumer {
    
        public static ConnectionFactory getConnectionFactory() {
            // 创建连接工程,下面给出的是默认的case
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.99.100");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("/");
            return factory;
        }
    
        public static void main(String[] args) throws IOException, TimeoutException  {
            ConnectionFactory connectionFactory = getConnectionFactory();
            Connection newConnection = null;
            Channel createChannel = null;
            try {
                newConnection = connectionFactory.newConnection();
                createChannel = newConnection.createChannel();
                /**
                 * 声明一个队列。
                 * 参数一:队列名称
                 * 参数二:是否持久化
                 * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
                 * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
                 * 参数五:队列的附加属性
                 * 注意:
                 * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
                 * 2.队列名可以任意取值,但需要与消息接收者一致。
                 * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
                 */
                createChannel.queueDeclare("myQueue", true, false, false,null);
                
                /**
                 * 接收消息。会持续坚挺,不能关闭channel和Connection
                 * 参数一:队列名称
                 * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
                 * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。
                 * 参数四:消息接收者
                 */
                createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
    
                        // 该消息是否已经被处理过,true表示已经处理过,false表示没有处理过
                        boolean redeliver = envelope.isRedeliver();
                        
                        String string = new String(body, "UTF-8");
                        // 获取消息的编号,根据编号确认消息
                        long deliveryTag = envelope.getDeliveryTag();
                        // 获取当前内部类中的通道
                        Channel channel = this.getChannel();
                        System.out.println((new Date()) + "	处理消息成功, 消息: " + string + "	 redeliver: " + redeliver);
                        
                        // 手动确认
                        channel.basicAck(deliveryTag, true);
                    }
                });
                
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }

    生成者生产一条消息后启动消费者,控制台如下:

    Fri Nov 06 22:45:31 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: false

    从队列查看发现消息变为0,也就是消息被删除

    (2)basicRecover(bool)重新回到队列:true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。

    源码如下:

        /**
         * Ask the broker to resend unacknowledged messages.  In 0-8
         * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
         * the new, deprecated method basic.recover_async is asynchronous.
         * @param requeue If true, messages will be requeued and possibly
         * delivered to a different consumer. If false, messages will be
         * redelivered to the same consumer.
         */
        Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

    测试

    1》参数为true重新回到队列:尽可能的推给其他消费者

    channel.basicRecover(true);

    结果:(会一直收到这条消息,并且不会删除。当然代码可以根据是否redeliver进行判断)

    Fri Nov 06 22:47:03 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: false
    Fri Nov 06 22:47:03 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 22:47:03 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 22:47:03 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true

    2》 参数为false重新投递给自己,只是会进行时间的延迟,推迟五秒后投递。

    Fri Nov 06 22:49:10 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 22:49:15 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 22:49:20 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 22:49:25 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 22:49:30 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true

    (3)basicReject 测试

    源码如下:

        /**
         * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
         * containing the received message being rejected.
         * @see com.rabbitmq.client.AMQP.Basic.Reject
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
         * @throws java.io.IOException if an error is encountered
         */
        void basicReject(long deliveryTag, boolean requeue) throws IOException;

    1》测试第二个参数为true的情况重新回到队列

    channel.basicReject(deliveryTag, true);

    结果:

    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: false
    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    Fri Nov 06 23:02:26 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
    。。。

    2》测试第二个参数为false的情况丢弃

                        channel.basicReject(deliveryTag, false);

    结果:(发现消息被丢弃的同时从队列删除)

    Fri Nov 06 23:03:39 CST 2020    处理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true

    (4) basicNack 测试

    源码如下:

        /**
         * Reject one or several received messages.
         *
         * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
         * @see com.rabbitmq.client.AMQP.Basic.Nack
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param multiple true to reject all messages up to and including
         * the supplied delivery tag; false to reject just the supplied
         * delivery tag.
         * @param requeue true if the rejected message(s) should be requeued rather
         * than discarded/dead-lettered
         * @throws java.io.IOException if an error is encountered
         */
        void basicNack(long deliveryTag, boolean multiple, boolean requeue)
                throws IOException;

      测试效果和上面basicReject一样。

  • 相关阅读:
    Android多屏幕适配
    android应用签名详解
    内部类与静态内部类详解
    SpringBoot整合Spring Retry实现重试机制
    行为型模式之模板方法模式
    行为型模式之操作复杂对象结构(访问者模式)
    行为型模式之算法的封装与切换(策略模式)
    行为型模式之处理对象的多种状态及其相互转换(状态模式)
    行为型模式之对象间的联动(观察者模式)
    行为型模式之撤销功能的实现(备忘录模式)
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/13934573.html
Copyright © 2011-2022 走看看