zoukankan      html  css  js  c++  java
  • RocketMq(五、事务消息)

    生产者对ID为12的用户进行修改操作,年龄增加一岁并发送给MQ,保证本地事务和消息能正确发送到MQ。

    事务原理

    https://www.cnblogs.com/huangying2124/p/11702761.html 

    可以看看这位大牛写的博客,这里就不具体介绍原理,只贴代码。

    test类

    package com.wk.test.rocketmqTest.transaction;
    
    import com.wk.entity.User;
    import com.wk.service.UserService;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class MQTest {
    
        @Autowired
        private UserService userService;
        @Autowired
        private Producer producer;
    
        @Test
        public void ss() throws MQClientException {
            User user = userService.findUserById(12);
            producer.exeutorTransaction(user);
        }
    }

    生产者

    package com.wk.test.rocketmqTest.transaction;
    
    import com.alibaba.fastjson.JSONObject;
    import com.wk.entity.User;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.*;
    
    @Component
    public class Producer {
    
        @Autowired
        private TransactionListenerImpl transactionListener;
    
        public void exeutorTransaction(User user){
    
            TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
            producer.setNamesrvAddr("10.32.16.179:9876");
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("transaction_thread");
                    return thread;
                }
            });
            producer.setSendMsgTimeout(10000);
            producer.setExecutorService(executorService);
            producer.setTransactionListener(transactionListener);
            try {
                producer.start();
                user.setDesc("年龄加一");
                Message message = new Message("TransactionTopic","transaction","key", (JSONObject.toJSONString(user)).getBytes());
                Map<String, Object> map = new HashMap<>();
                map.put("1", "测试参数1");
                map.put("2","测试参数2");
                SendResult sendResult = producer.sendMessageInTransaction(message,map);
                System.out.println(sendResult);
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
        }
    }

    事务监听

    package com.wk.test.rocketmqTest.transaction;
    
    import com.alibaba.fastjson.JSONObject;
    import com.wk.entity.User;
    import com.wk.service.UserService;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Component
    public class TransactionListenerImpl implements TransactionListener {
    
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        @Autowired
        private UserService userService;
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            try {
                String body = new String(message.getBody(),"UTF-8");
                User user = JSONObject.parseObject(body,User.class);
                Map<String, Object> map = (Map<String, Object>) o;
                System.out.println(map);
                userService.updateUser(user);
    //            int value = transactionIndex.getAndIncrement();
    //            int status = value % 3;
    //            localTrans.put(message.getTransactionId(), status);
                return LocalTransactionState.COMMIT_MESSAGE;
            } catch (Exception e) {
                e.printStackTrace();
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    //        Integer status = localTrans.get(messageExt.getMsgId());
    //        if(null != status){
    //            switch (status){
    //                case 0:
    //                    return LocalTransactionState.UNKNOW;
    //                case 1:
    //                    return LocalTransactionState.COMMIT_MESSAGE;
    //                case 2:
    //                    return LocalTransactionState.ROLLBACK_MESSAGE;
    //            }
    //        }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }

    git源码地址

    https://github.com/wangkang2/springboot

  • 相关阅读:
    ubuntu英文环境下使用中文输入法
    Flex 调用添加了SoapHeader的web service
    RoR: Ruby On Rails Web Service 3 分发模式
    C# CRC8实现
    java正则表达式过滤html标签
    静态内部类和非静态内部类的区别
    Java反射机制
    java回调函数简介
    Java之泛型编程
    Java基础知识之系统命令调用、序列化、JDO、匿名内部类
  • 原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12447540.html
Copyright © 2011-2022 走看看