zoukankan      html  css  js  c++  java
  • rocketmq事务消息

    rocketmq事务消息

     

    参考:

    https://blog.csdn.net/u011686226/article/details/78106215

    https://yq.aliyun.com/articles/55630

    https://my.oschina.net/u/2950586/blog/760677

     https://blog.csdn.net/chunlongyu/article/details/53844393

     

     

    分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践 

    说到分布式事务,就会谈到那个经典的”账号转账”问题:2个账号,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?

    一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

    但这里面有个问题:A是先update DB,后发送消息呢? 还是先发送消息,后update DB?

    假设先update DB成功,发送消息网络失败,重发又失败,怎么办? 
    假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

    所以,这里下个结论: 只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

    那这个问题怎么解决呢??

    错误的方案0

    有人可能想到了,我可以把“发送消息”这个网络调用和update DB放在同1个事务里面,如果发送消息失败,update DB自动回滚。这样不就保证2个操作的原子性了吗?

    这个方案看似正确,其实是错误的,原因有2:

    (1)网络的2将军问题:发送消息失败,发送方并不知道是消息中间件真的没有收到消息呢?还是消息已经收到了,只是返回response的时候失败了?

    如果是已经收到消息了,而发送端认为没有收到,执行update db的回滚操作。则会导致A账号的钱没有扣,B账号的钱却加了。

    (2)把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务。严重的,会block整个DB。这个风险很大。

    基于以上分析,我们知道,这个方案其实是错误的!

    方案1–业务方自己实现

    假设消息中间件没有提供“事务消息”功能,比如你用的是Kafka。那如何解决这个问题呢?

    解决方案如下: 
    (1)Producer端准备1张消息表,把update DB和insert message这2个操作,放在一个DB事务里面。

    (2)准备一个后台程序,源源不断的把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢,顺序也不会打乱。

    (3)Consumer端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。但这里又涉及一个原子性问题:如果保证消息消费 + insert message到判重表这2个操作的原子性?

    消费成功,但insert判重表失败,怎么办?关于这个,在Kafka的源码分析系列,第1篇, exactly once问题的时候,有过讨论。

    通过上面3步,我们基本就解决了这里update db和发送网络消息这2个操作的原子性问题。

    但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合额外增加业务方的负担。

    方案2 – RocketMQ 事务消息

    为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

    具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

    具体来说,上面的2个步骤,被分解成3个步骤: 
    (1) 发送Prepared消息 
    (2) update DB 
    (3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。

    可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

    具体代码实现如下:

    也就是定义了一个checkListener,RocketMQ会回调此Listener,从而实现上面所说的方案。

    // 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
    TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
    // 构造事务消息的生产者
    TransactionMQProducer producer = new TransactionMQProducer("groupName");
    // 设置事务决断处理类
    producer.setTransactionCheckListener(transactionCheckListener);
    // 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
    TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
    producer.start()
    // 构造MSG,省略构造参数
    Message msg = new Message(......);
    // 发送消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
    producer.shutdown();
    public TransactionSendResult sendMessageInTransaction(.....)  {
        // 逻辑代码,非实际代码
        // 1.发送消息
        sendResult = this.send(msg);
        // sendResult.getSendStatus() == SEND_OK
        // 2.如果消息发送成功,处理与消息关联的本地事务单元
        LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
        // 3.结束事务
        this.endTransaction(sendResult, localTransactionState, localException);
    }

    总结:对比方案2和方案1,RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。

    至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。

    人工介入

    可能有人又要说了,无论方案1,还是方案2,发送端把消息成功放入了队列,但消费端消费失败怎么办?

    消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?

    答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?

    对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。

     

     

    rocketmq事务消息的理解

     

    http://www.cnblogs.com/wxd0108/p/6038543.html

    RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

    如果endTransaction方法执行失败,导致数据没有发送到brokerbroker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法,最后调用endTransactionOnewaybroker来更新消息的最终状态。

    再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题?解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。

    本质上还是个二阶段提交

    重复消费幂等性要自己做

    RocketMQ 事务消息

    源代码版本是3.2.6,还是直接跑源代码。rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交。

    二阶段提交过程看图:

    事务逻辑

    第一阶段是:步骤1,2,3。
    第二阶段是:步骤4,5。

    具体说明:

    只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交。

    其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚。

    事务消息原理实现过程:

    一阶段:
    Producer向Broker发送1条类型为TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通过该变量可以直接定位到消息本身),由于该类型的消息在保存的时候,commitLogOffset没有被保存到consumerQueue中,此时客户端通过consumerQueue取不到commitLogOffset,所以该类型的消息无法被取到,导致不会被消费。

    一阶段的过程中,Broker保存了1条消息。

    二阶段:
    Producer端的TransactionExecuterImpl执行本地操作,返回本地事务的状态,然后发送一条类型为TransactionCommitType或者TransactionRollbackType的消息到Broker确认提交或者回滚,Broker通过Request中的commitLogOffset,获取到上面状态为TransactionPreparedType的消息(简称消息A),然后重新构造一条与消息A内容相同的消息B,设置状态为TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType类型的,会放commitLogOffset到consumerQueue中,TransactionRollbackType类型的,消息体设置为空,不会放commitLogOffset到consumerQueue中。

    二阶段的过程中,Broker也保存了1条消息。

    总结:事务消息过程中,broker一共保存2条消息。

    贴代码:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <logback.version>1.0.13</logback.version>

    <rocketmq.version>3.2.6</rocketmq.version>

    </properties>

    <dependencies>

    <dependency>

    <groupId>ch.qos.logback</groupId>

    <artifactId>logback-classic</artifactId>

    <version>1.0.13</version>

    </dependency>

    <dependency>

    <groupId>ch.qos.logback</groupId>

    <artifactId>logback-core</artifactId>

    <version>1.0.13</version>

    </dependency>

    <dependency>

    <groupId>com.alibaba.rocketmq</groupId>

    <artifactId>rocketmq-client</artifactId>

    <version>${rocketmq.version}</version>

    </dependency>

    <dependency>

    <groupId>junit</groupId>

    <artifactId>junit</artifactId>

    <version>4.10</version>

    <scope>test</scope>

    </dependency>

    </dependencies>

    TransactionCheckListenerImpl.java

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    package com.zoo.quickstart.transaction;

    import java.util.concurrent.atomic.AtomicInteger;

    import com.alibaba.rocketmq.client.producer.LocalTransactionState;

    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

    import com.alibaba.rocketmq.common.message.MessageExt;

    /**

    * 未决事务,服务器回查客户端,broker端发起请求代码没有被调用,所以此处代码可能没用。

    */

    public class TransactionCheckListenerImpl implements TransactionCheckListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    @Override

    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {

    System.out.println("server checking TrMsg " + msg.toString());

    int value = transactionIndex.getAndIncrement();

    if ((value % 6) == 0) {

    throw new RuntimeException("Could not find db");

    }

    else if ((value % 5) == 0) {

    return LocalTransactionState.ROLLBACK_MESSAGE;

    }

    else if ((value % 4) == 0) {

    return LocalTransactionState.COMMIT_MESSAGE;

    }

    return LocalTransactionState.UNKNOW;

    }

    }

    本地操作类TransactionExecuterImpl.java

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    package com.zoo.quickstart.transaction;

    import java.util.concurrent.atomic.AtomicInteger;

    import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;

    import com.alibaba.rocketmq.client.producer.LocalTransactionState;

    import com.alibaba.rocketmq.common.message.Message;

    /**

    * 执行本地事务

    */

    public class TransactionExecuterImpl implements LocalTransactionExecuter {

    private AtomicInteger transactionIndex = new AtomicInteger(1);

    @Override

    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

    int value = transactionIndex.getAndIncrement();

    if (value == 0) {

    throw new RuntimeException("Could not find db");

    }

    else if ((value % 5) == 0) {

    return LocalTransactionState.ROLLBACK_MESSAGE;

    }

    else if ((value % 4) == 0) {

    return LocalTransactionState.COMMIT_MESSAGE;

    }

    return LocalTransactionState.UNKNOW;

    }

    }

    Producer类:TransactionProducer.java

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    package com.zoo.quickstart.transaction;

    import com.alibaba.rocketmq.client.exception.MQClientException;

    import com.alibaba.rocketmq.client.producer.SendResult;

    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

    import com.alibaba.rocketmq.client.producer.TransactionMQProducer;

    import com.alibaba.rocketmq.common.message.Message;

    /**

    * 发送事务消息例子

    *

    */

    public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

    TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

    // 事务回查最小并发数

    producer.setCheckThreadPoolMinSize(2);

    // 事务回查最大并发数

    producer.setCheckThreadPoolMaxSize(2);

    // 队列数

    producer.setCheckRequestHoldMax(2000);

    producer.setTransactionCheckListener(transactionCheckListener);

    producer.setNamesrvAddr("192.168.0.104:9876");

    producer.start();

    String[] tags = new String[] { "TagA""TagB""TagC""TagD""TagE" };

    TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

    for (int i = 0; i < 1; i++) {

    try {

    Message msg =

    new Message("TopicTest", tags[i % tags.length], "KEY" + i,

    ("Hello RocketMQ " + i).getBytes());

    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

    System.out.println(sendResult);

    Thread.sleep(10);

    }

    catch (MQClientException e) {

    e.printStackTrace();

    }

    }

    for (int i = 0; i < 100000; i++) {

    Thread.sleep(1000);

    }

    producer.shutdown();

    }

    }

    RocketMQ 事务消息

    RocketMQ将事务拆分成小事务异步执行的方式来执行。
        RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
    RocketMQ事务消息:


    TransactionCheckListenerImpl:

    package aaron.mq.producer;
    
    import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * Created by Aaron Sheng on 10/19/16.
     * TransactionCheckListenerImpl handle transaction unsettled.
     * Broker will notify producer to check local transaction.
     */
    public class TransactionCheckListenerImpl implements TransactionCheckListener {
    
        @Override
        public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
            System.out.println("checkLocalTransactionState");
            System.out.println("topic: " + messageExt.getTopic());
            System.out.println("body: " + messageExt.getBody());
    
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }


    TransactionExecuterImpl:

    package aaron.mq.producer;
    
    import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
    import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    import com.alibaba.rocketmq.common.message.Message;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by Aaron Sheng on 10/19/16.
     * TransactionExecuterImpl executre local trancation and return result to broker.
     */
    public class TransactionExecuterImpl implements LocalTransactionExecuter {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        @Override
        public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
            System.out.println("executeLocalTransactionBranch " + message.toString());
    
            int value = transactionIndex.getAndIncrement();
            if ((value % 3) == 0) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if ((value % 3) == 1) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            } else{
                return LocalTransactionState.UNKNOW;
            }
        }
    }


    TransactionProducer:

    package aaron.mq.producer;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
    import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
    import com.alibaba.rocketmq.common.message.Message;
    
    /**
     * Created by Aaron Sheng on 10/19/16.
     */
    public class TransactionProducer {
        public static void produce() throws MQClientException {
            TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("TxProducer");
            producer.setCheckThreadPoolMinSize(2);
            producer.setCheckThreadPoolMaxSize(4);
            producer.setCheckRequestHoldMax(2000);
            producer.setTransactionCheckListener(transactionCheckListener);
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.setInstanceName("TxProducer-instance1");
            producer.setVipChannelEnabled(false);
            producer.start();
    
            TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
            try {
                for (int i = 0; i < 1000; i++) {
                    Message msg = new Message("Topic1",
                            "Tag1",
                            "OrderId" + i,
                            ("Body" + i).getBytes());
                    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                    System.out.println(sendResult);
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.shutdown();
            }
        }
    }
    


    RocketMQConsumer:

    package aaron.mq.consumer;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * Created by Aaron Sheng on 10/17/16.
     */
    public class RocketMQConsumer {
        public static void consume() throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.setVipChannelEnabled(false);
            consumer.setInstanceName("rmq-instance");
            consumer.subscribe("Topic1", "Tag1");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getKeys() + " " + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }



  • 相关阅读:
    解决项目迁移至Kubernetes集群中的代理问题
    gorm系列-简单入门
    py操作mongodb总结
    zabbix添加监控项以及常用的键值
    监控服务器cpu、磁盘、模板以及自定义key
    Zabbix+Grafana打造高逼格监控系统
    基于Docker的Mysql主从复制搭建
    rbac权限管理
    Django的认证系统
    Django 中间件
  • 原文地址:https://www.cnblogs.com/xuwc/p/9034029.html
Copyright © 2011-2022 走看看