zoukankan      html  css  js  c++  java
  • StringBoot集成Rabbit Redis和ack机制双重保险,保障消息一定能够正确的消费

    转: StringBoot集成Rabbit,根据业务返回ACK

    为了维护消息的有效性,当消费消息时候处理失败时候,不进行消费,需要我们根据业务区返回ACK,本项目我使用Redis和ack机制双重保险,保障消息一定能够正确的消费
    • 首先,接着上部分内容,使用Topic,机制(不明白的,可以回顾上部分内容)

    • 上部分内容,我们使用SpringBoot注解,去实现,但是控制权不完全账务,当进行大规模项目时候,不太建议使用

    
     @RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
        @RabbitHandler
        public void processUser(String message) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    logger.info("用户侧流水:{}",message);
                }
            });
        }
    
    • 根据源码分析,当然这里不分析源码,有兴趣的可以多失败几次就ok明白了

    • 在配置类中定义监听器,监听这个序列(AcknowledgeMode.MANUAL是必须的哦)

    
        /**
         * 接受消息的监听,这个监听客户交易流水的消息
         * 针对消费者配置
         * @return
         */
        @Bean
        public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, TransactionConsumeImpl transactionConsume) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queueMessage());
            container.setExposeListenerChannel(true);
            container.setMaxConcurrentConsumers(1);
            container.setConcurrentConsumers(1);
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
            container.setMessageListener(transactionConsume);
            return container;
        }
    

    这个 TransactionConsumeImpl 要继承ChannelAwareMessageListener,主要说的手动返回ACK就是channel。调用

    
    @Component
    public class TransactionConsumeImpl implements ChannelAwareMessageListener {
        private static final Logger logger = LoggerFactory.getLogger(TransactionConsumeImpl.class);
        private static final Gson gson = new Gson();
        @Autowired
        JedisShardInfo jedisShardInfo;
        @Autowired
        ExecutorService threadPool;
        @Autowired
        BoluomeFlowService boluomeFlowService;
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            String boby = new String(message.getBody(), "utf-8");//转换消息,我们是使用json数据格式
            threadPool.execute(new Runnable() {   //多线程处理
                @Override
                public void run() {
                    Jedis jedis = jedisShardInfo.createResource();
                    jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key为当前消息类型的集合里面,防止丢失消息
                    BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
                    String json = gson.toJson(flow);
                    if (boluomeFlowService.insert(flow)) {  //当添加成功时候返回成功
                        logger.info("客户交易流水添加1条记录:{}", json);
                        jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//从当前消息类型集合中移除已经消费过的消息
                        try {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK,通知此消息已经争取消费
                        } catch (IOException ie) {
                            logger.error("消费成功回调成功,io操作异常");
                        }
                    } else {
                        logger.info("客户交易流水添加失败记录:{}", json);
                    }
                }
            });
        }
    }
    
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚
    • channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
      • true 发送给下一个消费者
      • false 谁都不接受,从队列中删除
    Rabbitmq进阶
    import com.rabbitmq.client.Channel;
    import config.callback.ConfirmCallBackListener;
    import config.callback.ReturnCallBackListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.messaging.handler.annotation.Payload;
    import po.Mail;
    import rabbitMQ.listener.TransactionConsumerImpl;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    //连接rabbitMQ的基本配置
    @Configuration
    @ComponentScan(basePackages = {"rabbitMQ.listener","config.callback"})
    @EnableRabbit
    public class RabbitConfig {

    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    @Autowired
    private TransactionConsumerImpl transactionConsumer;
    @Autowired
    private ConfirmCallBackListener confirmCallBackListener;
    @Autowired
    private ReturnCallBackListener returnCallBackListener;

    @Bean
    public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("mq.xxx.cn");
    connectionFactory.setUsername("admin");
    connectionFactory.setPassword("admin");
    connectionFactory.setPort(5672);
    //connectionFactory.setPublisherConfirms(true);
    return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    /**
    * 使用消息队列,必须要考虑的问题就是生产者消息发送失败和消费者消息处理失败,这两种情况怎么处理.
    生产者发送消息,成功,则确认消息发送成功;失败,则返回消息发送失败信息,再做处理.
    消费者处理消息,成功,则消息队列自动删除消息;失败,则消息重新返回队列,等待处理.
    对于消费者处理失败的情况,如果仅仅只是让消息重新返回队列,等待处理,那么久有可能会出现很多消息一直无法处理的情况;
    因此,是否让消息返回队列,还有待商榷.
    **/

    //消息确认监听器confirmCallBackListener:
    rabbitTemplate.setConfirmCallback(confirmCallBackListener);
    //消息发送失败返回监听器returnCallBackListener:
    rabbitTemplate.setReturnCallback(returnCallBackListener);
    // mandatory必须设置true,return callback才生效
    rabbitTemplate.setMandatory(true);
    return rabbitTemplate;
    }
    @Bean
    public Queue payQueue() {
    //第二个参数 durable=true 表示 持久化
    Queue queue=new Queue("payQueue",true);
    return queue;
    }

    /**
    * 初始化线程池 多线程执行 消费任务
    */
    @Bean
    public ExecutorService threadPool(){
    return Executors.newFixedThreadPool(20);
    }

    /**
    * 对于消费端,我们可以只创建 SimpleRabbitListenerContainerFactory,
    * 它能够帮我们生成 RabbitListenerContainer,然后我们再使用
    * @RabbitListener 指定接收者收到信息时处理的方法。
    */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
    }

    //消费者配置 @RabbitListener 和 @Bean SimpleMessageListenerContainer 方式

    /**
    * 针对消费者配置
    * 方式一 每一个Queue 对应一个SimpleMessageListenerContainer
    * 指定消息接受监听器 MessageListener implements ChannelAwareMessageListener 接口 自己实现onMessage方法
    *
    * 作用: 接受消息的监听,这个监听指定Queue(payQueue)客户交易流水的消息
    * @return
    */
    /*
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(payQueue());
    container.setExposeListenerChannel(true);
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    //设置确认模式手工确认
    //设置ack方式为手动,增加对应队列的监听器。acknowledge="manual" 则开启ack机制
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.setMessageListener(transactionConsumer);
    return container;
    }
    */
    /**
    * 方式二 @RabbitListener
    * SimpleRabbitListenerContainerFactory 可以生成 RabbitListenerContainer
    */
    //factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    // 自动响应时 简单配置即可 不需要手动确认是否消费
    @RabbitListener(queues = "payQueue")
    public void displayMail(Mail mail) throws Exception {
    System.out.println("队列监听器1号收到消息"+mail.toString());
    }

    //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    //需要手动确认消息是否消费时 这样处理
    @RabbitListener(queues = "payQueue")
    public void process(@Payload Mail mail, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws Exception{
    logger.info("rabbit receiver message:{}",mail);
    try {
    channel.basicAck(deliveryTag, false);
    }catch (Exception e){
    logger.error("process message error: {}",e);
    }
    }

    }
    方式二 的消息接受MessageListener  实现类 TransactionConsumerImpl


    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.Channel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.amqp.support.converter.SimpleMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import po.Mail;

    import java.io.IOException;
    import java.util.concurrent.ExecutorService;

    @Service("transactionConsumerImpl")
    public class TransactionConsumerImpl implements ChannelAwareMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(TransactionConsumerImpl.class);

    private SimpleMessageConverter converter = new SimpleMessageConverter();
    @Autowired
    ExecutorService threadPool;
    // @Autowired
    //BoluomeFlowService boluomeFlowService;

    //只有在消息处理成功后发送ack确认,或失败后发送nack使信息重新投递
    public void onMessage(final Message message, final Channel channel) throws Exception {
    final String boby = new String(message.getBody(), "utf-8");//转换消息,我们是使用json数据格式
    Object msg = converter.fromMessage(message);
    // todo mail chuli
    System.out.println(JSONObject.toJSONString(msg));
    try {
    //手工返回ACK,通知此消息已经争取消费
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (IOException e1) {
    e1.printStackTrace();
    System.out.println("消息已重复处理失败,拒绝再次接收 失败...");
    }


    /* threadPool.execute(new Runnable() { //多线程处理
    public void run() {
    Jedis jedis = jedisShardInfo.createResource();
    jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key为当前消息类型的集合里面,防止丢失消息
    BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
    String json = gson.toJson(flow);

    if (boluomeFlowService.insert(flow)) { //当添加成功时候返回成功
    logger.info("客户交易流水添加1条记录:{}", json);
    jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//从当前消息类型集合中移除已经消费过的消息
    try {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK,通知此消息已经争取消费
    } catch (IOException ie) {
    logger.error("消费成功回调成功,io操作异常");
    }
    } else {
    logger.info("客户交易流水添加失败记录:{}", json);
    }

    try
    {
    System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody()));

    // deliveryTag是消息传送的次数,我这里是为了让消息队列的第一个消息到达的时候抛出异常,处理异常让消息重新回到队列,然后再次抛出异常,处理异常拒绝让消息重回队列
    if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2)
    {
    throw new Exception();
    }
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只确认当前一个消息收到,true确认所有consumer获得的消息
    }
    catch (Exception e){
    e.printStackTrace();

    if (message.getMessageProperties().getRedelivered())
    {
    System.out.println("消息已重复处理失败,拒绝再次接收...");
    try {
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
    } catch (IOException e1) {
    e1.printStackTrace();
    System.out.println("消息已重复处理失败,拒绝再次接收 失败...");
    }
    }
    else
    {
    System.out.println("消息即将再次返回队列处理...");
    try {
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
    } catch (IOException e1) {
    e1.printStackTrace();
    System.out.println("requeue为是否重新回到队列 失败...");
    }
    }
    }
    }
    });*/
    }
    }

     本地测试代码地址: xxx

  • 相关阅读:
    【JavaScript知识点一】JavaScript 数据类型
    grunt操作之Gruntfile.js
    js重定向后跳转到当前页面锚点
    Java-变量和方法
    Java-运算符
    Java-类型转化
    Java-数组
    Java-循环结构(for,while)
    Java-选择结构(if-else)
    Java-数据类型(引用类型)
  • 原文地址:https://www.cnblogs.com/xmanblue/p/7920716.html
Copyright © 2011-2022 走看看