zoukankan      html  css  js  c++  java
  • RocketMQ-事务消费

    理论部分在https://www.jianshu.com/p/453c6e7ff81c中的 “三、事务消息”。下面从代码层面看一下rockemq的事务消息

    一、事务消费端。

      从代码中看到跟其他模式的消费端没有什么两样。

    package org.hope.lee.consumer.transaction;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    public class ConsumerTransaction {
        public ConsumerTransaction() {
            String group_name = "transaction_consumer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr("192.168.31.xxx:9876;192.168.31.xxx:9876");
            try {
                consumer.subscribe("TopicTransaction", "*");
                consumer.registerMessageListener(new Listener());
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
        }
    
        class Listener implements MessageListenerConcurrently{
    
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    try {
                        for(MessageExt msg : list) {
                            String topic = msg.getTopic();
                            String msgBody = new String(msg.getBody(), "utf-8");
                            String tags = msg.getTags();
                            System.out.println("收到消息:" + "topic:" + topic + ", tags:" + tags + ",msg:" + msgBody);
                            msg.getTags();
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }
        public static void main(String[] args) {
            ConsumerTransaction c = new ConsumerTransaction();
            System.out.println("transaction consumer start......");
        }
    }

    二、本地事务的执行器,实现 LocalTransactionExecuter。

    package org.hope.lee.producer.transaction;
    
    import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
    import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    import com.alibaba.rocketmq.common.message.Message;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TransactionExecuterImpl implements LocalTransactionExecuter{
        private AtomicInteger transactionIndex = new AtomicInteger(1);
        public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
            System.out.println("msg = " + new String(message.getBody()));
            System.out.println("o = " + o);
            String tag = message.getTags();
            if(tag.equals("Transaction3")) {
                //这里有一个分阶段提交任务的概念
                System.out.println("这里处理业务逻辑,比如操作数据库,失败情况下进行ROLLBACK");
    
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            return LocalTransactionState.COMMIT_MESSAGE;
    //        return LocalTransactionState.ROLLBACK_MESSAGE;
    //        return LocalTransactionState.COMMIT_MESSAGE.UNKNOW;
        }
    }

    三、事务Producer端

      在这里可以看到我们用了new TransactionMQProducer()。并且在发送消息的时候添加了事务执行器producer.sendMessageInTransaction(msg, transactionExecuter, "tq")

    package org.hope.lee.producer.transaction;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.*;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.concurrent.TimeUnit;
    
    public class ProducerTransaction {
        public static void main(String[] args) throws MQClientException {
            String group_name = "transaction_producer";
            //这里使用TransactionMQProducer
            final TransactionMQProducer producer = new TransactionMQProducer(group_name);
            producer.setNamesrvAddr("192.168.xx.xxx:9876;192.168.xx.xxx:9876");
            //事务最小并法数
            producer.setCheckThreadPoolMinSize(5);
            //事务最大并发数
            producer.setCheckThreadPoolMaxSize(20);
            //队列数
            producer.setCheckRequestHoldMax(2000);
            /**
             * Producer对象在使用之前必须要调用start()初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start()
             */
            producer.start();
            //服务器回调Producer,检查本地事务分支成功还是失败
            //rocketmq会定时的调用这个checklistener,
            //在这里,我们可以根据由MQ回传的key去数据库查询,
            //判断这条数据到底是成功了还是失败了。
            //就是在这个定时check,rocketmq把这个功能在开源的代码中去除掉了。
            producer.setTransactionCheckListener(new TransactionCheckListener() {
                public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
                    System.out.println("key: " + messageExt.getKeys());
                    System.out.println("state--" + new String(messageExt.getBody()));
                    // return LocalTransactionState.ROLLBACK_MESSAGE;
                    return LocalTransactionState.COMMIT_MESSAGE;
                    // return LocalTransactionState.UNKNOW;
                }
            });
            /**
             * 下面这段代码表明一个Producer对象可以发送多个topic, 多个tag的消息
             * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
             * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用
             * 如果对消息可靠性要求很高需要对这种情况做处理。另外,消息可能会存在发送失败的情况
             * 失败重试由应用来处理
             */
            TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl();
            for(int i = 1; i <= 3; i++) {
                Message msg = new Message("TopicTransaction", "Transaction" + i, "key",
                        ("Hello Rocket" + i).getBytes());
                SendResult result = producer.sendMessageInTransaction(msg, transactionExecuter, "tq");
                System.out.println(result);
    
                try {
                    TimeUnit.MICROSECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            /**
             * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
             * 注意:我们建议应用在JBOSS,Tomcat等容器的退出钩子里调用shutdown 方法
             */
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                public void run() {
                    producer.shutdown();
                }
            }));
    
            System.exit(0);
        }
    }

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

    实验步骤:

    1、先运行Consumer端

    2、运行Producer端,然后看Consumer端和Produer端的控制台输出。

    Consumer端Console

    收到消息:topic:TopicTransaction, tags:Transaction1,msg:Hello Rocket1
    收到消息:topic:TopicTransaction, tags:Transaction2,msg:Hello Rocket2

    Producer端Console

    msg = Hello Rocket1
    o = tq
    SendResult [sendStatus=SEND_OK, msgId=C0A81FB000002A9F0000000000003CDC, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=0], queueOffset=0]
    msg = Hello Rocket2
    o = tq
    SendResult [sendStatus=SEND_OK, msgId=C0A81FB000002A9F0000000000003DA3, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=1], queueOffset=0]
    msg = Hello Rocket3
    o = tq
    这里处理业务逻辑,比如操作数据库,失败情况下进行ROLLBACK
    SendResult [sendStatus=SEND_OK, msgId=C0A81FB000002A9F0000000000003FF8, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=2], queueOffset=0]

    从输出结果来看,第三条消息并没有被Consumer端消费,被回滚了。

    四、RocketMQ完整版事务

      虽然rocketmq 3.2.6版本把事务部分给阉割了。但是我们还是可以了解一下其原理的。

    1.0、 Producer发送消息到RocketMQ,这条消息我们暂且称之为message1 并且是transaction状态的事务消息。

    1.1、MQ把message1存入数据库,并且是状态是prepared

    1.2、RocketMQ回调Producer中的本地事务。(本地事务由三个状态COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW)。

    1.2.1 、本地事务处理完成后,无论成功还是失败都会有一个状态,如果成功的话,Producer就会发送COMMIT_MESSAGE状态表示确认消息到RocketMQ上。

    1.2.2、然后把message1这个消息存储到consumer queue中,并在数据库中把这条prepared的消息标记为commited。

    1.2.3、这条消息就被Consumer消费了。

    1.3.1、如果Producer的事务处理返回了一个UNKNOW状态。因为broker会定时的去扫描数据库,如果数据库中的数据状态是commited的,那么就清除这条数据。

    1.4、如果数据库中数据的状态还是prepared的。那MQ就会主动的去调用Producer中的check方法。

    1.5.1、check方法再去查本地的数据库看有没有减钱,如果没减钱的话就rollback,

    1.6.1、rollback后Producer又发了一条ROLLBACK_MESSAGE给MQ。

    1.7.1、MQ收到这条消息后,就会把MQ的数据库中对应的prepared数据给清除掉。那么这条数据也就不会被Consumer端消费了

    1.5.2、check方法查本地数据库看有没有减钱,如果减钱了。

    1.6.2、会给MQ发送一个COMMIT_MESSAGE。

    1.7.2、MQ还会去查自己的数据库,然后把数据库中对应的数据给清除掉

  • 相关阅读:
    部署tomcat应用的三种方法
    【转】Linux下如何查看CPU信息, 包括位数和多核信息
    Tomcat端口占用的处理方式
    jconsole tomcat内存监控设置
    [转]成功的 Web 应用系统性能测试
    [转]Xmanager连接Linux远程桌面(后面添加了自己的部分)
    【转】配置远程jconsole监测tomcat
    大同小忆记五一大同之行
    理解Tomcat的WebappClassLoader(web应用类加载器)
    如何用PS批量为照片添加图片和文字水印
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8283525.html
Copyright © 2011-2022 走看看