zoukankan      html  css  js  c++  java
  • rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关

    转:https://blog.csdn.net/u014373554/article/details/92686063

    项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败、消息持久化、消费者失败处理方法和发送消息解决方法及手动确认的模式

    先引入pom.xml

    <!--rabbitmq-->
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    application 配置文件

    spring:
    rabbitmq:
      host: IP地址
      port: 5672
      username: 用户名
      password: 密码
    
    RabbitConfig配置文件
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    
    
    /**
     Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
     Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
     Queue:消息的载体,每个消息都会被投到一个或多个队列。
     Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
     Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
     vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
     Producer:消息生产者,就是投递消息的程序.
     Consumer:消息消费者,就是接受消息的程序.
     Channel:消息通道,在客户端的每个连接里,可建立多个channel.
    */
    @Configuration
    @Slf4j
    public class RabbitConfig {
    
        @Value("${spring.rabbitmq.host}")
        private String host;
    
        @Value("${spring.rabbitmq.port}")
        private int port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        public static final String EXCHANGE_A = "my_mq_exchange_A";
        public static final String EXCHANGE_B = "my_mq_exchange_B";
        public static final String EXCHANGE_C = "my_mq_exchange_C";
    
        public static final String QUEUE_A="QUEUE_A";
        public static final String QUEUE_B="QUEUE_B";
        public static final String QUEUE_C="QUEUE_C";
    
    
        public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
        public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
        public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setPublisherConfirms(true); //设置发送消息失败重试
            connectionFactory.setChannelCacheSize(100);//解决多线程发送消息
    
            return connectionFactory;
        }
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate rabbitTemplate(){
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            template.setMandatory(true); //设置发送消息失败重试
            return template;
    
        }
        //配置使用json转递数据
        @Bean
        public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
        /*public SimpleMessageListenerContainer messageListenerContainer(){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
            adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());
            return container;
        }*/
    
        /**
         * 针对消费者配置
         * 1. 设置交换机类型
         * 2. 将队列绑定到交换机
         * FanoutExchange: 将消息分发到所有的绑定队列,无 routingkey的概念
         * HeadersExchange: 通过添加属性key - value匹配
         * DirectExchange: 按照routingkey分发到指定队列
         * TopicExchange : 多关键字匹配
         * @return
         */
        @Bean
        public DirectExchange defaultExchange(){
            return new DirectExchange(EXCHANGE_A,true,false);
        }
    
        @Bean
        public Queue queueA(){
            return  new Queue(QUEUE_A,true);// 队列持久化
        }
    
        @Bean
        public Queue queueB(){
            return  new Queue(QUEUE_B,true);// 队列持久化
        }
    
        /**
         * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
         * @return
         */
        @Bean
        public Binding binding(){
            return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
        }
    
        @Bean
        public Binding bindingB(){
            return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
        }
    
    }

    生成者

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import java.util.UUID;
    
    /**
     * 生产者
     */
    @Component
    @Slf4j
    public class ProducerMessage implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{
    
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        public ProducerMessage(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
            rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
            rabbitTemplate.setReturnCallback(this::returnedMessage);
            rabbitTemplate.setMandatory(true);
        }
    
        public void  sendMsg (Object content){
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId);
    
        }
    
        /**
         * 消息发送到队列中,进行消息确认
         * @param correlationData
         * @param ack
         * @param cause
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info(" 消息确认的id: " + correlationData);
            if(ack){
                log.info("消息发送成功");
                //发送成功 删除本地数据库存的消息
            }else{
                log.info("消息发送失败:id "+ correlationData +"消息发送失败的原因"+ cause);
                // 根据本地消息的状态为失败,可以用定时任务去处理数据
    
            }
        }
    
        /**
         * 消息发送失败返回监控
         * @param message
         * @param i
         * @param s
         * @param s1
         * @param s2
         */
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            log.info("returnedMessage [消息从交换机到队列失败]  message:"+message);
    
        }
    }

    消费者

    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import net.sf.json.JSONObject;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.io.IOException;
    
    /**
     * 消费者
     */
    
    @Slf4j
    @Component
    
    public class ComsumerMessage {
    
        @RabbitListener(queues = RabbitConfig.QUEUE_A)
        public void handleMessage(Message message,Channel channel) throws  IOException{
            try {
                String json = new String(message.getBody());
                JSONObject jsonObject = JSONObject.fromObject(json);
                log.info("消息了【】handleMessage" +  json);
                int i = 1/0;
                //业务处理。
                /**
                 * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据中是否有数据
                 * 1、有数据则不消费,直接应答处理
                 * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
                 * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
                 */
    
                //手动应答
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }catch (Exception e){
                log.error("消费消息失败了【】error:"+ message.getBody());
                log.error("OrderConsumer  handleMessage {} , error:",message,e);
                // 处理消息失败,将消息重新放回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            }
    
        }
    
    }

    发送消息:调用生成的方法

    import com.zz.blog.BlogApplicationTests;
    import com.zz.blog.mq.ProducerMessage;
    import net.sf.json.JSONObject;
    import org.junit.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import java.util.UUID;
    public class Message extends BlogApplicationTests {
        @Autowired
        private ProducerMessage producerMessage;
    
        @Test
        public void sendMessage(){
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("id", UUID.randomUUID().toString());
            jsonObject.put("name","TEST");
            jsonObject.put("desc","订单已生成");
            //防止发送消息失败,将发送消息存入本地。
    
            producerMessage.sendMsg(jsonObject.toString());
    
        }
    }
    

    rabbitTemplate的发送消息流程是这样的:
    1 发送数据并返回(不确认rabbitmq服务器已成功接收)
    2 异步的接收从rabbitmq返回的ack确认信息
    3 收到ack后调用confirmCallback函数
    注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用

    在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

    最完美的解决方案只有1种:
    使用rabbitmq的事务机制。
    但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。

    基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
    在rabbitTemplate异步确认的基础上
    1 在本地缓存已发送的message
    2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
    3 定时扫描本地的message,如果大于一定时间未被确认,则重发

    当然了,这种解决方式也有一定的问题
    想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。

    消息存入本地:在message 发消息的写数据库中。

    消息应答成功,则删除本地消息,失败更改消息状态,可以使用定时任务去处理。

    消息持久化:

    消费者: 

    /**
     * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据库中是否有数据
     * 1、有数据则不消费,直接应答处理
     * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
     * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
     */

  • 相关阅读:
    asp.net mvc上传图片案例
    kafka 常用参数
    play framework 笔记
    调试 kafka manager 源码
    kafka AdminClient 闲时关闭连接
    kafka 心跳和 rebalance
    kafka producer batch 发送消息
    kafka producer 发送消息简介
    zk 的配额
    kafka consumer 指定 offset,进行消息回溯
  • 原文地址:https://www.cnblogs.com/duende99/p/11597619.html
Copyright © 2011-2022 走看看