zoukankan      html  css  js  c++  java
  • RocketMQ初探

    一、MQ的应用场景

      1.1 异步解偶

       在订单系统中同步调用支付系统,库存系统,物流系统。当其中任意一个系统异常,则影响整个业务流程。引入MQ后,订单系统将消息发往MQ,由MQ将消息推送到下游系统。订单系统不需要关心后续的执行情况,整个下单流程已完成。倘下游系统异常,则由MQ负责重试。

      1.2 削峰填谷

      上游系统尽可能多的接收订单,将数据发往MQ,由于整个系统的处理能力有限,MQ中的消息将分批次的交由下游系统处理,虽然整个流程较平时会有延迟,但胜在用户依然能正常下单,而不是返回失败

      1.3 数据分发

      将一份数据分发给多个下游系统

      1.4 异步回调

      1.5其他

        1.5.1 顺序收发:保证消息的先进先出

        1.5.2 分布式事务一致性:MQ事务消息  

    二、MQ的优缺点

      优点:解偶、数据分发、削峰

      缺点

        1. 可用性降低(外部依赖越多稳定性越差,如何保证高可用?)

        2. 复杂度提高(需要处理重复消费问题,消息丢失问题,消息传递的顺序性)

        3. 一致性问题

    三、消息类型

      3.1 普通消息

        同步发送(发送并等待MQ返回发送结果),异步发送(发送者提供异步回调接口由MQ调用),单向发送(发送者不关心发送结果)  

      3.2 顺序消息

        对于同一个Topic,发布者按照什么顺序发布消息,消费者会按照同样的顺序收到消息。

        顺序消息分为全局顺序消息和分区顺序消息

        全局顺序:同一个Topic,所有消息先进先出

        分区顺序:同一个Topic,根据Sharding Key进行区块分区,同一个分区内(同一个消息队列)消息先进先出

      3.3 广播消息

        集群消费模式:每条消息只被集群下的某台机器消费。MQ重试时不保证路由到同一台机器

        广播消费模式:每条消息被集群下所有消费者处理。

        1. 广播模式不支持顺序消息

        2. 广播模式不支持重置消费位点

        3. 每条消息需要被相同订阅

      3.4 延迟消息

        订单需要30分钟内支付完成,超过30分钟订单状态设置为未支付。适用于生产和消费之间有时间窗口的要求。实现思路是将延迟消息通过临时存储进行暂存,到期后投递到目标Topic中。

        对临时存储的要求:

        1.高性能,写入性能要高,关系型数据库通常不满足

        2.高可靠,消息不能丢失,需要持久化

        3. 支持排序,不同延迟的消息发送时间不同

        4. 支持长时间保存,延迟时间可能几个月。

      3.5 RocketMQ的延迟消息

        延迟消息发送后,存放在一个特殊的topic下,不同的延迟级别有不同的队列序号,队列数是由配置的延迟等级决定的,一个延迟等级一个队列。在延迟时间到达后,由定时线程读取转化成普通的消息存放在真实的Topic下。RokcetMQ的延迟时间是固定等级的,如果想要自定义需要提前修改配置。

      3.6 批量消息

        发送者一次发送多条消息,批消息的topic是相同的,但是每条消息的msgId是不一样的。一批消息大小不能超过1M

      3.7 过滤消息

        过滤方式:    

        3.7.1 TAG模式过滤

        发送消息时为每条消息设置TAG标签,同一TOPIC下根据TAG分类。 

        3.7.2 SQL表达式过滤

        发送端通过message.putUserProperty()方法设置属性  

        消费端订阅消息时通过运行SQL过滤表达式进行条件匹配:consumer.subscribe("topic",MessageSelector.bySql("age between 10 and 20"));

      3.8 事务消息

        3.8.1 概念介绍

        1. 事务消息:提供类似Open X/XA的分布式事务功能

        2. 半事务消息:不能立即投送的消息,需要发送者的二次确认

        3. 消息回查:对半事务消息,每段时间向发送者确认,发送者提供的回调接口返回三种状态:提交,回滚和未知

        3.8.2 注意事项

        1. 事务消息不支持延迟和批量

        2. 消息回查默认15次,超过则丢弃,并打印错误日志

        3. 超时时间在Broker配置文件的transactionTimeout参数设置,可以通过用户属性CHECK_IMMUNITY_TIME_IN_SECONDS改变限制

        4. 事务消息可能不止一次被检查/消费,需要做好幂等

        5. 事务消息的生产者ID不能与其他类型消息的生产者ID共享

        3.8.3 流程

        1. 发送端发送半消息。sendMessageInTransaction方法

          1. 将消息打上事务消息的标记,服务端以区分

          2. 发送半事务消息(DefaultMQProducerImpl#send方法,请求头SysFlag属性设置成TRANSACTION_PREPARED_TYPE)

        2. 服务端接收半消息。TransactionalMessageService#prepareMessage方法

          1. 将半消息的真实topic,queueId放进消息体自身的map里进行缓存(TransactionalMessageBridge#parseHalfMessageInner方法)

          2. 将半消息的topic 设置为“RMQ_SYS_TRANS_HALF_TOPIC”,queueId设置为0(TransactionalMessageBridge#parseHalfMessageInner方法)

          3. 将半消息写入commitLog进行持久化(MessageStore#putMessage方法)

        3. 发送端执行本地事务并通知执行结果。sendMessageInTransaction方法

          1. 发送成功,执行本地事务executerLocalTransaction方法,方法返回枚举LocalTransactionState(提交,回滚,未知)

          2. 执行endTransaction方法:根据本地事务执行情况,通知服务端提交还是回滚

        4. 服务端接收发送端执行结果。EndTransactionProcessor#processRequest方法

          1. 根据通知请求体中消息的文件偏移量,取出半消息,还原消息的topic和queueId

          2. 若发送端通知提交,重新存储到commitLog(sendFinalMessage方法)

          3. 不管是提交还是回滚,都需要删除半消息(deletePrepareMessage方法)。

           实际是新创建一个消息,消息体为半消息的队列偏移量,并存放到RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中,这个topic下的半消息都是已经处理过的。

        5. 服务端定时回查。TransactionalMessageCheckService#onWaitEnd方法

          1. 获取RMQ_SYS_TRANS_HALF_TOPIC的半消息队列,循环遍历队列,逐个处理

          2. 做下过滤,已经处理过的不再处理(在RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中)

          3. 做下过滤,最大回查次数不超过15次(默认,具体看broker的transactionCheckMax配置)和最大存储时间不超过3天

          4. 做下过滤,半消息刚放进队列,还不需要执行回查(有个超时时间,默认6秒,具体看broker的transactionTimeOut配置,用户通过赋值CHECK_IMMUNITY_TIME_IN_SECONDS属性设置)

          5. 进行回查,resolveHalfMsg(),内部有线程池提交回查任务

        6. 发送端处理回查请求。DefaultMQProducerImpl#checkTransactionState方法

          1. 根据TransactionListener#checkLocalTransaction方法获取对应事务Id的状态

          2. 根据状态通知服务端提交还是回滚

        源码分析:https://segmentfault.com/a/1190000019755235?utm_source=tag-newest,https://blog.csdn.net/hosaos/article/details/90240260,https://www.sohu.com/a/287495304_612370

          

    
    
    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    nlog学习使用
    浏览器缓存信息的清理
    安装Debugging Tools时出现错误Setup could not find the file WinSDK_amd64的处理
    双网卡下添加静态路由方法
    DELL T430进RAID的方式:, 硬盘损坏后的处理方式
    虚拟机: 虚拟机win7的激活方式(底层操作系统 为 win10) ===用windows loader
    联想 M415 I3-6100 CPU安装系统方法
    AOC 电视机T3212M 进入 工厂模式方法,修改开机启动方式
    使用WinNTSetup安装win10时提示efi part有红叉(win10安装UEFI系统安装)
    联想服务器thinkserver TS550 Raid5制作及winserver2012R2 安装过来
  • 原文地址:https://www.cnblogs.com/walker993/p/14561824.html
Copyright © 2011-2022 走看看