zoukankan      html  css  js  c++  java
  • 分布式事务——幂等设计(rocketmq案例)

      幂等指的就是执行多次和执行一次的效果相同,主要是为了防止数据重复消费。MQ中为了保证消息的可靠性,生产者发送消息失败(例如网络超时)会触发 "重试机制",它不是生产者重试而是MQ自动触发的重试机制, 而这种情况下消费者就会收到两条消息,比如明明只需要扣一次款, 可是消费者却执行了2次。为了解决幂等问题,每一个消息应该有一个全局的唯一的标识,当处理过这条消息后,就把这个标识保存到数据库或者redis中,在处理消息前前判断这个标识记录为空就好了。像activemq中msgId就是唯一的,我们可以直接拿这个id来判断,但是rocketmq重试机制不一样,它重发会产生一个新的id,但是它提供了setKeys()这个api,我们可以给key设置一个唯一的流水编号来加以判断。(重试机制是不存在并发问题的,它是间隔一段时间自动促发的)。

    1. 导入依赖( 生产者和消费者的依赖都一样)

        <parent> 
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.7.RELEASE</version>
            <relativePath/> 
        </parent>
        <!-- springcloud
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>Camden.SR6</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
         -->
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <!-- webmvc -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- 集成lombok 框架(get/set) -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <!-- RocketMq -->
            <dependency>
                <groupId>com.alibaba.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>3.2.6</version>
            </dependency>
            <!-- 热加载 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <optional>true</optional>
            </dependency>
            <!-- jackson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.30</version>
            </dependency>
        </dependencies>
      
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build> 
    pom.xml

    2. 生产者配置参数和配置文件

    #该应用是否启用生产者
    #rocketmq.producer.isOnOff=on
    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    rocketmq.producer.groupName=mqtest
    #mq的nameserver地址
    rocketmq.producer.namesrvAddr=192.168.5.7:9876
    #消息最大长度 默认1024*4(4M)
    rocketmq.producer.maxMessageSize=4096
    #发送消息超时时间,默认3000
    rocketmq.producer.sendMsgTimeout=3000
    #发送消息失败重试次数,默认2
    rocketmq.producer.retryTimesWhenSendFailed=3
    rocketmq.properties
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    /**
     * 生产者配置
     */
    @PropertySource("classpath:rocketmq.properties")
    @Configuration
    public class MQProducerConfiguration {
        public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
        /**
         * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
         */
        @Value("${rocketmq.producer.groupName}")
        private String groupName;
        /** 服务器地址  */
        @Value("${rocketmq.producer.namesrvAddr}")
        private String namesrvAddr;
        /**
         * 消息最大大小,默认4M
         */
        @Value("${rocketmq.producer.maxMessageSize}")
        private Integer maxMessageSize ;
        /**
         * 消息发送超时时间,默认3秒
         */
        @Value("${rocketmq.producer.sendMsgTimeout}")
        private Integer sendMsgTimeout;
        /**
         * 消息发送失败重试次数,默认2次
         */
        @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
        private Integer retryTimesWhenSendFailed;
    
        @Bean
        public DefaultMQProducer getRocketMQProducer() {
            DefaultMQProducer producer;
            producer = new DefaultMQProducer(this.groupName);
            producer.setNamesrvAddr(this.namesrvAddr);
            //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
            //producer.setInstanceName(instanceName);
            producer.setMaxMessageSize(this.maxMessageSize);
            producer.setSendMsgTimeout(this.sendMsgTimeout);
            //如果发送消息失败,设置重试次数,默认为2次
            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
            try {
                producer.start();
                LOGGER.info(String.format("rocketmq producer start "));
            } catch (MQClientException e) {
                LOGGER.error(String.format("producer is error {}", e.getMessage(),e));
            }
            return producer;
        }
    }
    MQProducerConfiguration

    3. 生产者发送消息

    import java.util.UUID;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import lombok.extern.slf4j.Slf4j;
    @RestController
    @Slf4j
    public class TestController {
    
        /**使用RocketMq的生产者*/
        @Autowired
        private DefaultMQProducer defaultMQProducer;
        
        @RequestMapping("/send")
        public void send(){
            String msg = "幂等";
            log.info("开始发送消息:"+msg);
            
            try {
                // arg0主题名称    arg1分组    arg2内容
                Message sendMsg = new Message("DemoTopic","wulei",(msg).getBytes());
                // 注意: activemq的msgId是唯一的,但是rocketmq的不是,所以幂等不能用id来判断,我们可以通过setKeys来解决,一般都是业务id,这里用随机数代替。
                sendMsg.setKeys(UUID.randomUUID().toString());
                SendResult sendResult = defaultMQProducer.send(sendMsg);
                //默认3秒超时
                log.info("消息发送响应信息:"+sendResult.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    TestController

    4. 消费者配置参数和配置文件

    ##该应用是否启用消费者
    #rocketmq.consumer.isOnOff=on
    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    rocketmq.consumer.groupName=mqtest
    #mq的nameserver地址
    rocketmq.consumer.namesrvAddr=192.168.5.7:9876
    #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
    rocketmq.consumer.topics=DemoTopic~*;
    rocketmq.consumer.consumeThreadMin=20
    rocketmq.consumer.consumeThreadMax=64
    #设置一次消费消息的条数,默认为1条
    rocketmq.consumer.consumeMessageBatchMaxSize=1
    rocketmq.properties
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.wulei.listener.MQConsumeMsgListenerProcessor;
    /**
     * 消费者配置
     */
    @PropertySource("classpath:rocketmq.properties")
    @Configuration
    public class MQConsumerConfiguration {
        public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
        // 地址
        @Value("${rocketmq.consumer.namesrvAddr}")
        private String namesrvAddr;
        // 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
        @Value("${rocketmq.consumer.groupName}")
        private String groupName;
        // 该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags)
        @Value("${rocketmq.consumer.topics}")
        private String topics;
        
        @Value("${rocketmq.consumer.consumeThreadMin}")
        private int consumeThreadMin;
        @Value("${rocketmq.consumer.consumeThreadMax}")
        private int consumeThreadMax;
        @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
        private int consumeMessageBatchMaxSize;
        
        @Autowired
        private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
        
        @Bean
        public DefaultMQPushConsumer getRocketMQConsumer(){
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr(namesrvAddr);
            consumer.setConsumeThreadMin(consumeThreadMin);
            consumer.setConsumeThreadMax(consumeThreadMax);
            consumer.registerMessageListener(mqMessageListenerProcessor);
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            /**
             * 设置消费模型,集群还是广播,默认为集群
             */
            //consumer.setMessageModel(MessageModel.CLUSTERING);
            /**
             * 设置一次消费消息的条数,默认为1条
             */
            consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
            try {
                /**
                 * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
                 */
                String[] topicTagsArr = topics.split(";");
                for (String topicTags : topicTagsArr) {
                    String[] topicTag = topicTags.split("~");
                    consumer.subscribe(topicTag[0],topicTag[1]);
                }
                consumer.start();
                LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
            }catch (MQClientException e){
                LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
            }
            return consumer;
        }
    }
    MQConsumerConfiguration

    5. 消费者监听消息

    import java.util.HashMap;
    import java.util.List;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.message.MessageExt;
    @Component
    public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{
        private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
       
        // 假装这是一个redis
        private HashMap<String, String> myredis = new HashMap<String, String>();
        
        /**
         *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
         *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
         */
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    //        if(CollectionUtils.isEmpty(msgs)){
    //            logger.info("接受到的消息为空,不处理,直接返回成功");
    //            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    //        }
    //        
    
            //for(MessageExt messageExt : msgs) { }
            MessageExt messageExt = msgs.get(0);
            String keys = messageExt.getKeys();// 自定义的唯一key
            String msgId = null;                // 消息id(不是唯一的)
            String msgContext = null;            // 消息内容
            int reconsume = 0;                   // 重试次数
            
            
            if(messageExt.getTopic().equals("DemoTopic") && messageExt.getTags().equals("wulei")){
                if(myredis.get(keys)==null) { 
                    //logger.info("接受到的消息为:"+messageExt.toString());
                      msgId = messageExt.getMsgId();
                      msgContext = new String(messageExt.getBody());
                      reconsume = messageExt.getReconsumeTimes();
                      try {
                          int i = 1/0;
                          System.out.println("消费成功: id:"+msgId+"  msg"+msgContext+"   次数"+reconsume);
                          myredis.put(messageExt.getKeys(), msgContext);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        // 重试3次就不在重试了,直接返回消费成功状态,并触发人工补偿机制。
                        if(reconsume==2) {
                            myredis.put(messageExt.getKeys(), msgContext);
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }else {
                            // 一般消费者这边尽量不要抛异常,它失败就会触发重试机制。如果非要抛异常可以在try{}catch{}里面return ConsumeConcurrentlyStatus.RECONSUME_LATER(表示失败让他重试)
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                }else {
                    // 已经消费过就不要再重试了,直接返回成功。
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            }else {
                // 不存在不要再重试了,直接返回成功。
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }
    }
    MQConsumeMsgListenerProcessor
  • 相关阅读:
    浅谈python中selenium库调动webdriver驱动浏览器的实现原理
    浅谈python面向对象编程和面向过程编程的区别
    python中可变与不可变类型的全局变量
    冒泡排序和sort,sorted排序函数
    浅谈python之利用pandas和openpyxl读取excel数据
    configparser读取配置文件时的相对路径问题
    关于网站登录后的页面操作所携带的不同cookie值
    【转】Cookie和Session和Cache
    python输出九九乘法表
    win10系统使用小技巧【转】
  • 原文地址:https://www.cnblogs.com/wlwl/p/10155889.html
Copyright © 2011-2022 走看看