zoukankan      html  css  js  c++  java
  • RocketMQ学习笔记(10)----RocketMQ的Producer 事务消息使用

    1. 事务消息原理图

     RocketMQ除了支持普通消息,顺序消息之外,还支持了事务消息。

    1. 什么是分布式事务?

      分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。以上是百度百科的解释,简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

    2. RocketMQ中分布式事务的使用

      在RocketMQ中分布式事务执行过程分为三个阶段,RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。当RocketMQ确认消息发送失败时,RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

      实现方式:

      Producer实现:

    package com.wangx.rocketmq.transaction;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
    
            //初始化TransactionListenerImpl
            TransactionListener transactionListener = new TransactionListenerImpl();
    
            //创建事物transactionProducer
            TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
            //由于本地回调监听跟消息的发送会并发进行,所以可以使用线程池来执行操作
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
    
            producer.setNamesrvAddr("47.105.149.61:9876;47.105.145.123:9876");
            //设置线程池
            producer.setExecutorService(executorService);
            //设置事物监听
            producer.setTransactionListener(transactionListener);
            producer.start();
    
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            producer.shutdown();
        }
    }

      事务监听实现方式:

    package com.wangx.rocketmq.transaction;
    
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        /**
         * send()
         * @param msg send中的message对象,
         * @param arg send方法中回调函数之后的传入的参数
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            //提交本地事物
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    
        //更新本地事物的最终状态
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }

    RocketMQ事物的三种状态:

      ROLLBACK_MESSAGE:回滚事务

      COMMIT_MESSAGE: 提交事务

      UNKNOW: broker会定时的回查Producer消息状态,直到彻底成功或失败。

         当executeLocalTransaction方法返回ROLLBACK_MESSAGE时,表示直接回滚事务,当返回COMMIT_MESSAGE提交事务

      当返回UNKNOW时,Broker会在一段时间之后回查checkLocalTransaction,根据checkLocalTransaction返回状态执行事务的操作(回滚或提交),

      如示例中,当返回ROLLBACK_MESSAGE时消费者不会收到消息,且不会调用回查函数,当返回COMMIT_MESSAGE时事务提交,消费者收到消息,当返回UNKNOW时,在一段时间之后调用回查函数,并根据status判断返回提交或回滚状态,返回提交状态的消息将会被消费者消费,所以此时消费者可以消费部分消息。

    原文 RocketMQ学习笔记(10)----RocketMQ的Producer 事务消息使用

  • 相关阅读:
    AIX 开机启动网络服务配置
    aix 6+ mount 光驱
    AIX 系统中 PVID 的含义与作用
    lsslot
    hp小机定位网卡位置
    HP 7440老机器重启
    ntp -q 输出说明
    使用过滤器实现网站访问计数器的功能
    过滤器:-------------创建并配置过滤器:
    什么是Servlet,Servlet的作用,生命周期,如何创建、配置Servlet
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10867610.html
Copyright © 2011-2022 走看看