zoukankan      html  css  js  c++  java
  • Native RabbitMQ RejectMessage

    消费者在消费消息的时候,可以根据业务场景来选择拒绝消息,通过参数设置是否让消息再次入队。拒绝消息的指令有俩,一个是basicNack,一个是basicReject,这两个区别是后者不能批量拒绝,只能单条拒绝。代码演示如下:

    一个生产者,两个正常消费者,一个拒绝消息的消费者,三个消费者共同订阅一个队列,这样生产者发布的消息会轮询被三个消费者消费,演示代码如下:

    生产者

    /**
     * 普通的生产者
     *
     * @author zhangjianbing
     * time 2020/09/04
     */
    @SuppressWarnings("Duplicates")
    public class NormalProducer {
    
        public static final String EXCHANGE_NAME = "reject-exchange";
    
        public static void main(String[] args) throws Exception {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("1.1.1.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            final Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false);
    
            String queue = "MESSAGE.CALLBACK.QUEUE";
            // 注意同时设置队列、消息持久化才显得有意义
            channel.queueDeclare(queue, false, false, false, null);
    
            String routeKey = "error";
            /** 将队列和交换器通过路由键绑定,表示此队列只接受error的消息 **/
            channel.queueBind(queue, EXCHANGE_NAME, routeKey);
    
            for (int i = 1; i < 11; i++) {
                String message = "hello_world_" + i;
                channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());
                System.out.println("生产者发送消息:" + message);
            }
            // 关闭信道和连接
            channel.close();
            connection.close();
        }
    
    }
    

    正常消费者A

    /**
     * 普通消费者A
     *
     * @author zhangjianbing
     * time 2020/09/04
     */
    @SuppressWarnings("Duplicates")
    public class NormalConsumerA {
    
        public static void main(String[] args) throws Exception {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.1.3.37");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            final Channel channel = connection.createChannel();
    
            String queue = "MESSAGE.CALLBACK.QUEUE";
            System.out.println("正在等待消息。。。。。。");
    
            /**声明一个消费者**/
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, StandardCharsets.UTF_8);
                    System.out.println("NormalConsumerA收到的消息:" + message);
                    // false 表示单条确认 true 表示确认tag之前所有的消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            /**手动确认模式**/
            channel.basicConsume(queue, false, consumer);
        }
    
    }
    

    正常消费者B

    /**
     * 普通消费者B
     *
     * @author zhangjianbing
     * time 2020/09/04
     */
    @SuppressWarnings("Duplicates")
    public class NormalConsumerB {
    
        public static void main(String[] args) throws Exception {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.1.3.37");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            final Channel channel = connection.createChannel();
    
            String queue = "MESSAGE.CALLBACK.QUEUE";
            System.out.println("正在等待消息。。。。。。");
    
            /**声明一个消费者**/
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, StandardCharsets.UTF_8);
                    System.out.println("NormalConsumerB收到的消息:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            /**手动确认模式**/
            channel.basicConsume(queue, false, consumer);
        }
    
    }
    

    拒绝消费者C

    /**
     * 拒绝消费者
     *          当多个消费者共同消费同一个队列的时候
     *          此消费者会根据业务需求来拒绝消息,可以选择是否将消息重新入队
     *          重新入队的消息会被打上一个重新入队的标签
     *          重新入队的消息会放在队列的末尾,再次轮询投递给队列上的消费者
     *
     * 拒绝消息有两种方式:
     *          ① basicNack
     *          ② basicReject
     *          这俩唯一的不同就是basicNack有批量拒绝参数
     *
     * @author zhangjianbing
     * time 2020/09/04
     */
    @SuppressWarnings("Duplicates")
    public class RejectConsumer {
    
        public static void main(String[] args) throws Exception {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.1.3.37");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            final Channel channel = connection.createChannel();
    
            String queue = "MESSAGE.CALLBACK.QUEUE";
            System.out.println("正在等待消息。。。。。。");
    
            /**声明一个消费者**/
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, StandardCharsets.UTF_8);
                    System.out.println("RejectConsumer收到的消息:" + message);
                    try {
                        throw new RuntimeException("RejectConsumer业务处理发生异常");
                    } catch (Exception re) {
                        System.out.println(re.getMessage() + ",消息重新入队");
                        channel.basicReject(envelope.getDeliveryTag(), true);// 重新入队
                    }
                }
            };
    
            /**手动确认模式**/
            channel.basicConsume(queue, false, consumer);
        }
    
    }
    

    测试结果

    生产者发送10条消息,假设消息投递的顺序是A,B,C

    消费者A:

    正在等待消息。。。。。。
    NormalConsumerA收到的消息:hello_world_1
    NormalConsumerA收到的消息:hello_world_4
    NormalConsumerA收到的消息:hello_world_7
    NormalConsumerA收到的消息:hello_world_10
    NormalConsumerA收到的消息:hello_world_9
    

    消费者B:

    正在等待消息。。。。。。
    NormalConsumerB收到的消息:hello_world_2
    NormalConsumerB收到的消息:hello_world_5
    NormalConsumerB收到的消息:hello_world_8
    NormalConsumerB收到的消息:hello_world_3
    NormalConsumerB收到的消息:hello_world_6
    

    消费者C:

    正在等待消息。。。。。。
    RejectConsumer收到的消息:hello_world_3
    RejectConsumer业务处理发生异常,消息重新入队
    RejectConsumer收到的消息:hello_world_6
    RejectConsumer业务处理发生异常,消息重新入队
    RejectConsumer收到的消息:hello_world_9
    RejectConsumer业务处理发生异常,消息重新入队
    RejectConsumer收到的消息:hello_world_6
    RejectConsumer业务处理发生异常,消息重新入队
    

    消费者C的结果中显示消息6被拒绝了两次,说明被拒绝的消息重新入队后还可以再次被C消费,只是再拒绝一次。

    多个消费者共同消费一个队列,消息首先投递给谁,取决于哪个消费者先启动,这是轮询算法的问题。

  • 相关阅读:
    Python入门-函数进阶
    Python入门-初始函数
    Leetcode300. Longest Increasing Subsequence最长上升子序列
    Leetcode139. Word Break单词拆分
    Leetcode279. Perfect Squares完全平方数
    Leetcode319. Bulb Switcher灯泡开关
    Leetcode322. Coin Change零钱兑换
    二叉树三种遍历两种方法(递归和迭代)
    Leetcode145. Binary Tree Postorder Traversal二叉树的后序遍历
    Leetcode515. Find Largest Value in Each Tree Row在每个树行中找最大值
  • 原文地址:https://www.cnblogs.com/zhangjianbing/p/13618231.html
Copyright © 2011-2022 走看看