zoukankan      html  css  js  c++  java
  • 分布式事务之可靠消息

    什么是可靠消息?

    为什么我们需要它,为什么我们要强调可靠?

    生产方 消息发送出去了,如果生产方收到了消息的正常反馈,那么我们就可以知道消息的确切的状态。 如果消息无响应 或者超时了呢? 有多个情况,

    1 消息未到达mq,发送途中 就某些原因丢失了,

    2 消息送达mq,但是mq处理未完成就丢失(这里又可以细分为:mq未记录日志,已记录日志但未落盘消息,已落盘但未来得及响应请求,已落盘但未完成推送(仅仅针对推的情况))。

    3 消息送达mq,消息也已经被mq 处理完毕,但是响应在 网络途中 丢失。

    4 生产方对发送的消息设置超时时间。 虽然消息送达mq,消息也已经被mq处理,也返回来了,但是由于此时已经超时,生产方已经断开了网络连接,从而丢弃了响应。

    尽管我们可以尽量的确保MQ可靠,让mq 可靠的持久化消息,但是网络 是不可靠的, 几乎没有办法确保 网络 可靠。。。 ( 网络可靠就这么难吗??)

    如果知道是情况1、2,我们可以重新发送消息即可,也就是重试。(当然,如果网络问题,或者mq挂掉了,重试也没有,只有等待 这些问题回复才重试才有意义,因此,我们可以设置一个 比较长的、“按照指数爆炸” 的  “重试间隔时间”)

    如果知道是情况3,如果我们不需要消息id,那么我们可以认为 消息发送成功,业务也处理成功。不用重试了!

    对于前面3个情况,生产方是无法判断 消息到底mq 是否已经处理好了, 这就显得 “不可靠”了, 除了量子力学,没人喜欢不确定性。 有可能1 、 2  也有可能是3,怎么办? 或许我们可以 通过查询mq 的方式(也就是peek 一下,但是不消费)判断 是否是3。 

    所以,我们期望有一个可靠消息,能够避免任何问题,包括网络问题。 如果消息不可靠,那么我们就需要采取其他的措施,比如之前讲的 本地消息表。。。

     

    分布式事务大致可以分为以下四种( 不知道是什么样的一个分类 准则):

    • 两阶段型
    • 补偿型
    • 异步确保型
    • 最大努力通知型

    可靠消息, 属于 异步确保型。 why?后面会说明。

    可靠消息 的实现

    可靠消息 可能有很多实现方式,但一般就是指事务型消息。可靠消息 一般也是基于MQ的。 前面说过了基于本地消息表的分布式事务。基于本地消息表的分布式事务 其实也可以认为是 基于MQ的分布式事务的 一种情况。

    基于MQ的分布式事务:

    生产方处理过程:
    
    1 主动方应用先把消息发给消息中间件,消息状态标记为“待确认”;
    2 消息中间件收到消息后,把消息持久化到消息存储中,但并不向被动方应用投递消息;
    3 消息中间件返回消息持久化结果(成功/失败),主动方应用根据返回结果进行判断如何进行业务操作处理:
          失败:放弃业务操作处理,结束(必要时向上层返回失败结果);
          成功:执行业务操作处理;
    4 业务操作完成后,把业务操作结果(成功/失败)发送给消息中间件;
    5 消息中间件收到业务操作结果后,根据业务结果进行处理;
          失败:删除消息存储中的消息,结束;
          成功:更新消息存储中的消息状态为“待发送(可发送)”,紧接着执行
    消息投递;
    
    6 前面的正向流程都成功后,向被动方应用投递消息;


    消息发送一致性方案的正向流程是可行的,但异常流程怎么处理呢?
    消息发送到消息中间件中能得到保障了,但消息的准确消费(投递)又如何保障呢?
    有没有支持这种发送一致性流程的现成消息中间件?
    —— 其实是有的,RocketMQ, 另外我认为, 可以消费方自己去消费,而不是推消息给 消费方,
    会不会更好? 推的话 会有一些延迟,但是 这样也降低了 MQ的压力。
    --------------------- 
    作者:chenshiying007 
    来源:CSDN 
    原文:    https://blog.csdn.net/qq_27384769/article/details/79305402 
    版权声明:本文为博主原创文章,转载请附上博文链接!

    基于RocketMQ的分布式事务:

    在RocketMQ中实现了分布式事务,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部。

    下面简单介绍一下MQ事务,如果想对其详细了解可以参考: https://www.jianshu.com/p/453c6e7ff81c。 

    基本流程如下: 第一阶段Prepared消息,会拿到消息的地址。

    第二阶段执行本地事务。

    第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。消息接受者就能使用这个消息。

    如果确认消息失败,在RocketMq Broker中提供了定时扫描没有更新状态的消息,如果有消息没有得到确认,会向消息发送者发送消息,来判断是否提交,在rocketmq中是以listener的形式给发送者,用来处理。 

    如果消费超时,则需要一直重试,消息接收端需要保证幂等。如果消息消费失败,这个就需要人工进行处理,因为这个概率较低,如果为了这种小概率时间而设计这个复杂的流程反而得不偿失。

    ===========================================================================

    上面的说明 摘抄于 ,我看了后还是有些懵。仔细看了https://blog.csdn.net/u010425776/article/details/79516298, 之后,我明白了一些。

    消息生产过程的可靠性保证

    在系统A处理任务A前,首先向消息中间件发送一条消息
    消息中间件收到后将该条消息持久化,但并不投递。此时下游系统B仍然不知道该条消息的存在。
    消息中间件持久化成功后,便向系统A返回一个确认应答;
    系统A收到确认应答后,则可以开始处理任务A;
    任务A处理完成后,向消息中间件发送Commit请求。该请求发送完成后,对系统A而言,该事务的处理过程就结束了,此时它可以处理别的任务了。 
    但commit消息可能会在传输途中丢失,从而消息中间件并不会向系统B投递这条消息,从而系统就会出现不一致性。这个问题由消息中间件的事务回查机制完成,下文会介绍。
    消息中间件收到Commit指令后,便向系统B投递该消息,从而触发任务B的执行;
    当任务B执行完成后,系统B向消息中间件返回一个确认应答,告诉消息中间件该消息已经成功消费,此时,这个分布式事务完成。
    --------------------- 
    作者:凌澜星空 
    来源:CSDN 
    原文:https://blog.csdn.net/u010425776/article/details/79516298 
    版权声明:本文为博主原创文章,转载请附上博文链接!

    上述过程中,如果任务A处理失败,那么需要进入回滚流程,如下图所示:  

    • 若系统A在处理任务A时失败,那么就会向消息中间件发送Rollback请求。和发送Commit请求一样,系统A发完之后便可以认为回滚已经完成,它便可以去做其他的事情。
    • 消息中间件收到回滚请求后,直接将该消息丢弃,而不投递给系统B,从而不会触发系统B的任务B。

    上面所介绍的Commit和Rollback都属于理想情况,但在实际系统中,Commit和Rollback指令都有可能在传输途中丢失。那么当出现这种情况的时候,消息中间件是如何保证数据一致性呢?——答案就是超时询问机制。

     

    系统A除了实现正常的业务流程外,还需提供一个事务询问的接口,供消息中间件调用。当消息中间件收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Commit或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果:

      • 提交 
        若获得的状态是“提交”,则将该消息投递给系统B。
      • 回滚 
        若获得的状态是“回滚”,则直接将条消息丢弃。
      • 处理中 
        若获得的状态是“处理中”,则继续等待
    消息中间件的超时询问机制能够防止上游系统因在传输过程中丢失Commit/Rollback指令而导致的系统不一致情况,而且能降低上游系统的阻塞时间,
    上游系统只要发出Commit/Rollback指令后便可以处理其他任务,无需等待确认应答。而Commit/Rollback指令丢失的情况通过超时询问机制来弥补,
    这样大大降低上游系统的阻塞时间,提升系统的并发度。
    --------------------- 作者:凌澜星空 来源:CSDN 原文:https://blog.csdn.net/u010425776/article/details/79516298 版权声明:本文为博主原创文章,转载请附上博文链接!

    系统A发送消息的操作应该是同步的,因为我们需要获取消息的地址,否则后面就无法进行消息更新和确认或取消了。 但是呢,这一步骤,如前所述,也是可能出现问题的,也就是无法区分前述情况1、2、3。 但是呢,这个也不要紧的, 因为 消息必须要确认后, 后面的系统才会进行消费。 如果出现情况3,那么我们 尽可以的把 这个待确认的消息丢弃。 而系统A 因为无法收到mq 的反馈, 不会进行下一步, 也可以保证整个系统的 一致性。

     

    下面来说一说消息投递(消费)过程的可靠性保证。

    当上游系统执行完任务并向消息中间件提交了Commit指令后,便可以处理其他任务了,此时它可以认为事务已经完成,接下来消息中间件一定会保证消息被下游系统成功消费掉!那么这是怎么做到的呢?这由消息中间件的投递流程来保证。

    消息中间件向下游系统投递完消息后便进入阻塞等待状态,下游系统便立即进行任务的处理,任务处理完成后便向消息中间件返回应答。消息中间件收到确认应答后便认为该事务处理完毕!

    如果消息在投递过程中丢失,

    或消息的确认应答在返回途中丢失,

    那么消息中间件在等待确认应答超时之后就会重新投递,直到下游消费者返回消费成功响应为止。当然,一般消息中间件可以设置消息重试的次数和时间间隔,比如:当第一次投递失败后,每隔五分钟重试一次,一共重试3次。如果重试3次之后仍然投递失败,那么这条消息就需要人工干预。


    有的同学可能要问:消息投递失败后为什么不回滚消息,而是不断尝试重新投递?

    这就涉及到整套分布式事务系统的实现成本问题。
    我们知道,当系统A将向消息中间件发送Commit指令后,它便去做别的事情了。如果此时消息投递失败,需要回滚的话,就需要让系统A事先提供回滚接口,这无疑增加了额外的开发成本,业务系统的复杂度也将提高。对于一个业务系统的设计目标是,在保证性能的前提下,最大限度地降低系统复杂度,从而能够降低系统的运维成本。

    ————  如果不断重试, 还是失败了, 那么就需要想想其他方法了,比如发通知然后人工介入啊等等。。

    不知大家是否发现,上游系统A向消息中间件提交Commit/Rollback消息采用的是异步方式,也就是当上游系统提交完消息后便可以去做别的事情,接下来提交、回滚就完全交给消息中间件来完成,并且完全信任消息中间件,认为它一定能正确地完成事务的提交或回滚。然而,消息中间件向下游系统投递消息的过程是同步的。也就是消息中间件将消息投递给下游系统后,它会阻塞等待,等下游系统成功处理完任务返回确认应答后才取消阻塞等待。为什么这两者在设计上是不一致的呢?

    首先,上游系统和消息中间件之间采用异步通信是为了提高系统并发度。业务系统直接和用户打交道,用户体验尤为重要,因此这种异步通信方式能够极大程度地降低用户等待时间。此外,异步通信相对于同步通信而言,没有了长时间的阻塞等待,因此系统的并发性也大大增加。但异步通信可能会引起Commit/Rollback指令丢失的问题,这就由消息中间件的超时询问机制来弥补。

    那么,消息中间件和下游系统之间为什么要采用同步通信呢?

    异步能提升系统性能,但随之会增加系统复杂度;而同步虽然降低系统并发度,但实现成本较低。因此,在对并发度要求不是很高的情况下,或者服务器资源较为充裕的情况下,我们可以选择同步来降低系统的复杂度。
    我们知道,消息中间件是一个独立于业务系统的第三方中间件,它不和任何业务系统产生直接的耦合,它也不和用户产生直接的关联,它一般部署在独立的服务器集群上,具有良好的可扩展性,所以不必太过于担心它的性能,如果处理速度无法满足我们的要求,可以增加机器来解决。而且,即使消息中间件处理速度有一定的延迟那也是可以接受的,因为前面所介绍的BASE理论就告诉我们了,我们追求的是最终一致性,而非实时一致性,因此消息中间件产生的时延导致事务短暂的不一致是可以接受的。
    ---------------------

    为什么可靠消息属于 异步确保? 我们可以看到 系统A发送commit、rollback 都是 异步发送的, 也就是直接发送,但不获取任何反馈结果。 也大概就是为什么称作 “异步确保” 的原因吧!

    示例

    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException {
            if (null == tranExecuter) {
                throw new MQClientException("tranExecutor is null", (Throwable)null);
            } else {
                Validators.checkMessage(msg, this.defaultMQProducer);
                SendResult sendResult = null;
                MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
                MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
    
                try {
                    //这里执行第一次发送消息,也就是预发送,并获取sendResult,这里包含msg的所有消息
                    sendResult = this.send(msg);
                } catch (Exception var10) {
                    throw new MQClientException("send message Exception", var10);
                }
    
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                Throwable localException = null;
                //根据预发送消息的状态做不同的处理,这里主要看SEND_OK
                switch(sendResult.getSendStatus()) {
                case SEND_OK:
                    try {
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }
    
    // 这里做第二步,执行业务逻辑,即本地事物,
    //具体的本地事物在LocalTransactionExecuter参数的实现类中,
    //需要根据自己的业务逻辑去写,下面的//tranExecuter.executeLocalTransactionBranch(msg, arg);会执行实
    //现类中的executeLocalTransactionBranch业务。
                        localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
    
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            this.log.info(msg.toString());
                        }
                    } catch (Throwable var9) {
                        this.log.info("executeLocalTransactionBranch exception", var9);
                        this.log.info(msg.toString());
                        localException = var9;
                    }
                    break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                }
    
                try {
    // 这里的方法,其中的localTransactionState是第二次执行业务逻辑的结果
    //可以根据这个结果,知道本地事物执行的成功还是失败。或者是异常localException,
    //这样可以根据第一次发送消息的结果sendResult,去修改mq中第一次发送消息的状态,完成第三步操作。
                    this.endTransaction(sendResult, localTransactionState, localException);
                } catch (Exception var8) {
                    this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
                }
    
                TransactionSendResult transactionSendResult = new TransactionSendResult();
                transactionSendResult.setSendStatus(sendResult.getSendStatus());
                transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
                transactionSendResult.setMsgId(sendResult.getMsgId());
                transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
                transactionSendResult.setTransactionId(sendResult.getTransactionId());
                transactionSendResult.setLocalTransactionState(localTransactionState);
                return transactionSendResult;
            }
        }
    
        
    public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    
    // 获取第一次发送消息的id
            MessageId id;
            if (sendResult.getOffsetMsgId() != null) {
                id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
            } else {
                id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
            }
    
        //获取事物id
            String transactionId = sendResult.getTransactionId();
            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
            EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
            requestHeader.setTransactionId(transactionId);
            requestHeader.setCommitLogOffset(id.getOffset());
    
           //根据本地事物执行状态localTransactionState,告知mq修改状态
    
            switch(localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(Integer.valueOf(8));
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(Integer.valueOf(12));
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(Integer.valueOf(0));
            }
    
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
            requestHeader.setMsgId(sendResult.getMsgId());
            String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
    //具体执行第三步完成整个事务。      
      this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
    }
     
     
    作者:时之令
    链接:https://www.jianshu.com/p/8c997d0917c6
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
    View Code

    从代码看, 这个处理有些复杂,或许我们需要把rocketmq 的文档和 api 仔细看看。

    参考:

    作者:凌澜星空
    来源:CSDN
    原文:https://blog.csdn.net/u010425776/article/details/79516298
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    Running MYSQL 5.7 By Bash On Ubuntu On Windows:ERROR 2002 (HY000): Can't connect to local MySQL server through socket '/var/run/mysqld/mysqld.sock' (2)
    MiniDao FreeMarker Cache 缓存问题
    Minidao FreeMarker 数组
    插入排序实例
    Binutils工具集中的一些比较常用的工具
    交叉编译工具简介
    TQ2440触摸屏
    对IIC总线时序的一点理解以及ACK和NACK(NAK)
    UART,SPI,IIC的一点理解
    linux中vi显示中文乱码的问题
  • 原文地址:https://www.cnblogs.com/FlyAway2013/p/10124528.html
Copyright © 2011-2022 走看看