zoukankan      html  css  js  c++  java
  • 生产者producer消息的投递可靠和消费者consumer的Ack确认(一)

    在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

     

    在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
      confirm 确认模式
      return 退回模式

    rabbitmq 整个消息投递的路径为:
      producer--->rabbitmq broker--->exchange--->queue--->consumer
      消息从 producer 到 exchange 则会返回一个 confirmCallback 。
      消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
      我们将利用这两个 callback 控制消息的可靠性投递

    rabbitmq-high-producer项目

    pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.qingfeng</groupId>
    <artifactId>rabbitmq-high-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-high-producer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
    <java.version>1.8</java.version>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>

    </project>

    application.properties文件

    server.port=8081
    
    # ip
    spring.rabbitmq.host=127.0.0.1
    #默认5672
    spring.rabbitmq.port=5672
    #用户名
    spring.rabbitmq.username=guest
    #密码
    spring.rabbitmq.password=guest
    #连接到代理时用的虚拟主机
    spring.rabbitmq.virtual-host=/
    #是否启用【发布确认】,默认false
    #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-confirm-type=correlated
    #是否启用【发布返回】,默认false
    spring.rabbitmq.publisher-returns=true
    #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    controller层

    package com.qingfeng.rabbitmqhighproducer.ack.controller;
    
    import com.qingfeng.rabbitmqhighproducer.ack.service.AckRabbitService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    /**
     * 路由,根据消息携带的路由键,将消息转发给对应的队列
     */
    @RestController
    @RequestMapping("/ack")
    public class AckDirectSendMessageController {
    
        @Autowired
        private AckRabbitService ackRabbitService;
    
        //http://127.0.0.1:8081/ack/sendDirectMessage
        //producer发送消息confirm 确认模式
        @GetMapping("/sendDirectMessage")
        public String sendDirectMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM
            ackRabbitService.sendMessageConfirm("ack_direct_exchange","ack_direct_routing",messageId);
            return "ok";
        }
    
        //producer发送消息return  退回模式
        @GetMapping("/sendDirectMessage02")
        public String sendDirectMessage02() {
            String messageId = String.valueOf(UUID.randomUUID());
    
            //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM
            ackRabbitService.sendMessageConfirm("ack_direct_exchange","ack_direct_routing",messageId);
            return "ok";
        }
    
    }
    AckRabbitConfig类
    package com.qingfeng.rabbitmqhighproducer.ack.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 
     */
    @Configuration
    public class AckRabbitConfig {
    
        //交换机名称
        public static final String ACK_DIRECT_EXCHANGE = "ack_direct_exchange";
        //队列名称
        public static final String ACK_DIRECT__QUEUE = "ack_direct_queue";
        //路由
        public static final String ACK_DIRECT__ROUTING = "ack_direct_routing";
    
        /**
         * 交换机
         */
        @Bean(name = "ackDirectExchange")
        public DirectExchange ackDirectExchange() {
            // 参数意义:
            // name: 名称
            // durable: true
            // autoDelete: 自动删除
            return new DirectExchange(ACK_DIRECT_EXCHANGE, true, false);
        }
    
        /**
         * 队列
         */
        @Bean(name ="ackDirectQueue" )
        public Queue ackDirectQueue() {
            // 参数意义:
            // name: 名称
            // durable: true为持久化
            return new Queue(ACK_DIRECT__QUEUE, true);
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean
        public Binding bindingDirectAck() {
            return BindingBuilder.bind(ackDirectQueue())
                    .to(ackDirectExchange())
                    .with(ACK_DIRECT__ROUTING);
        }
    
    
    }
    
    
    AckRabbitService类
    package com.qingfeng.rabbitmqhighproducer.ack.service;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @Service
    public class AckRabbitService implements RabbitTemplate.ConfirmCallback {
     
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 消息从 producer生产者 到 exchange交换机 则会返回一个 confirmCallback 。
         *  producer发送消息confirm 确认模式
         * @param exchange 交换机
         * @param routingKey 路由键
         * @param msg 消息
         */
        public void sendMessageConfirm(String exchange, String routingKey, String msg){
            /**
             * 确认模式:
             * 步骤:
             * 1. 确认模式开启:spring.rabbitmq.publisher-confirm-type=correlated
             * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
             */
            //2. 定义回调
            rabbitTemplate.setConfirmCallback(this);
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            System.out.println("发送的消息为"+msg);
            this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,correlationData);
        }
    
        /**
         *
         * @param correlationData 相关配置信息
         * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
         * @param cause 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("消息id:" + correlationData.getId());
            if (ack) {
                System.out.println("消息发送确认成功");
            } else {
                System.out.println("消息发送确认失败:" + cause);
            }
        }
    }

     

    消费者Consumer的Ack,表示消费端收到消息后的确认方式

    ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

      有三种确认方式:   

        自动确认:acknowledge="none"   

        手动确认:acknowledge="manual"   

        根据异常情况确认:acknowledge="auto"

    其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。



    rabbitmq-high-consumer项目

    application.properties文件

    server.port=8082
    
    # ip
    spring.rabbitmq.host=127.0.0.1
    #默认5672
    spring.rabbitmq.port=5672
    #用户名
    spring.rabbitmq.username=guest
    #密码
    spring.rabbitmq.password=guest
    #连接到代理时用的虚拟主机
    spring.rabbitmq.virtual-host=/
    #是否启用【发布确认】,默认false
    #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-confirm-type=correlated
    #是否启用【发布返回】,默认false
    spring.rabbitmq.publisher-returns=true
    #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    
    AckRabbitConfig类
    package com.qingfeng.rabbitmqhighconsumer.ack;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     *
     */
    @Configuration
    public class AckRabbitConfig {
    
        //交换机名称
        public static final String ACK_DIRECT_EXCHANGE = "ack_direct_exchange";
        //队列名称
        public static final String ACK_DIRECT_QUEUE = "ack_direct_queue";
        //路由
        public static final String ACK_DIRECT_ROUTING = "ack_direct_routing";
    
        /**
         * 交换机
         */
        @Bean(name = "ackDirectExchange")
        public DirectExchange ackDirectExchange() {
            // 参数意义:
            // name: 名称
            // durable: true
            // autoDelete: 自动删除
            return new DirectExchange(ACK_DIRECT_EXCHANGE, true, false);
        }
    
        /**
         * 队列
         */
        @Bean(name ="ackDirectQueue" )
        public Queue ackDirectQueue() {
            // 参数意义:
            // name: 名称
            // durable: true为持久化
            return new Queue(ACK_DIRECT_QUEUE, true);
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean
        public Binding bindingDirect() {
            return BindingBuilder.bind(ackDirectQueue())
                    .to(ackDirectExchange())
                    .with(ACK_DIRECT_ROUTING);
        }
    
    
    
    }
    AckListener类
    package com.qingfeng.rabbitmqhighconsumer.ack;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Consumer ACK机制:
     *  1. 设置手动签收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
     *  2. 让监听器类实现ChannelAwareMessageListener接口
     *  3. 如果消息成功处理,则调用channel的 basicAck()签收
     *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
     */
    
    @Component
    public class AckListener {
    
        //手动签收
        @RabbitHandler
        @RabbitListener(queues = "ack_direct_queue")
        public void onMessage(Message message, Channel channel) throws Exception {
            //Thread.sleep(1000);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                //1.接收转换消息
                System.out.println("接受到的消息为"+new String(message.getBody()));
    
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
               // int i = 1/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
    
                //4.拒绝签收
                /*
                第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
                 */
                System.out.println("拒绝签收");
                channel.basicNack(deliveryTag,true,true);
                //channel.basicReject(deliveryTag,true);
            }
        }
    }
    启动两个项目测试:正确的测试
    访问http://127.0.0.1:8081/ack/sendDirectMessage

    rabbitmq-high-provider输出结果
    
    
    rabbitmq-high-consumer输出结果

    测试错误:

    当我们把交换机写错,把正确的ack_direct_exchange改为错误的ack11_direct_exchange在测试http://127.0.0.1:8081/ack/sendDirectMessage

    package com.qingfeng.rabbitmqhighproducer.ack.controller;
    
    import com.qingfeng.rabbitmqhighproducer.ack.service.AckRabbitService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    /**
     * 路由,根据消息携带的路由键,将消息转发给对应的队列
     */
    @RestController
    @RequestMapping("/ack")
    public class AckDirectSendMessageController {
    
        @Autowired
        private AckRabbitService ackRabbitService;
    
        //http://127.0.0.1:8081/ack/sendDirectMessage
        //producer发送消息confirm 确认模式
        @GetMapping("/sendDirectMessage")
        public String sendDirectMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM
            ackRabbitService.sendMessageConfirm("ack11_direct_exchange","ack_direct_routing",messageId);
            return "ok";
        }
    
    
    }
    
    

    测试结果:

    
    

    测试错误:

    
    

    当我们把AckListener 类的int i = 1/0打开,在测试http://127.0.0.1:8081/ack/sendDirectMessage

    
    
    package com.qingfeng.rabbitmqhighconsumer.ack;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Consumer ACK机制:
     *  1. 设置手动签收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
     *  2. 让监听器类实现ChannelAwareMessageListener接口
     *  3. 如果消息成功处理,则调用channel的 basicAck()签收
     *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
     */
    
    @Component
    public class AckListener {
    
        //手动签收
        @RabbitHandler
        @RabbitListener(queues = "ack_direct_queue")
        public void onMessage(Message message, Channel channel) throws Exception {
            //Thread.sleep(1000);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                //1.接收转换消息
                System.out.println("接受到的消息为"+new String(message.getBody()));
    
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
               int i = 1/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
    
                //4.拒绝签收
                /*
                第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
                 */
                System.out.println("拒绝签收");
                channel.basicNack(deliveryTag,true,true);
                //channel.basicReject(deliveryTag,true);
            }
        }
    }
    
    
    

    测试结果:

    rabbitmq-high-provider输出结果:没有问题
    rabbitmq-high-consumer输出结果:监听业务出错,会拒绝接受消息
     



     


  • 相关阅读:
    Struts2【UI标签、数据回显、资源国际化】
    Struts2【配置】知识要点
    Struts2与Spring整合
    Struts2入门这一篇就够了
    Hibernate最全面试题
    Hibernate【查询、连接池、逆向工程】
    Hibernate【缓存】知识要点
    Hibernate【inverse和cascade属性】知识要点
    输入法设置,SublimeTest,putty掉线
    Hibernate【映射】知识要点
  • 原文地址:https://www.cnblogs.com/Amywangqing/p/14694758.html
Copyright © 2011-2022 走看看