zoukankan      html  css  js  c++  java
  • RocketMQ开发规范

    前言

    消息队列 RocketMQ 版是基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消

    息堆积、高吞吐、可靠重试等特性。

    2020年,RocketMQ面试题 -面试题驱动RocketMQ学习

    技术选型

    顺序消息

    • 局部顺序:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。

    延时消息

    • RabbitMQ不支持延迟消息,Active和RocketMQ支持延迟消息。

    可靠性

    • RocketMQ支持异步/同步刷盘;异步/同步Replication。Kafka使用异步刷盘方式,异步Replication。

    支持的队列数

    • Kafka单机超过 64 个队列/分区,消息发送性能降低严重;RocketMQ 单机支持最高 5 万个队列,性能稳定。

    消息回溯

    • Kafka可以按照Offset来回溯消息;RocketMQ支持按照时间来回溯消息,精度毫秒,例如从一天 之前的某时某分某秒开始重新消费消息。

    消费失败重试机制

    • Kafka消费失败默认不支持重试;RocketMQ消费失败支持定时重试,每次重试间隔时间顺延。

    二.消息队列核心概念

    2.1 架构介绍:

        Name Server:是一个几乎无状态节点,可集群部署,在消息队列RocketMQ 版中提供命名服务,更新和发现Broker服务。

        Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和 Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个 Slave Broker 只能对应一个Master Broker。Broker启动后需要完成一次将自己注册

    至Name Server 的操作;随后每隔 30s 定期向Name Server上报 Topic 路由信息。

        生产者:与Name Server集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长链接,且定时向Master Broker发送心跳。

        消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心

    跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

    2.2 基本属性介绍:

        Topic:消息主题,一级消息类型,生产者向其发送消息。

        生产者:也称为消息发布者,负责生产并发送消息至 Topic。

        消费者:也称为消息订阅者,负责从 Topic 接收并消费消息。

        消息:生产者向 Topic 发送并最终传送给消费者的数据和(可选)属性的组合。

       消息属性:生产者可以为消息定义的属性,包含 Message Key 和 Tag。

       Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

    2.3 消息的收发模型:

        消息队列 RocketMQ版支持发布/订阅模型,消息生产者应用创建 Topic 并将消息发送到 Topic。消费者应用创建对 Topic 的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。

       生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。

       (一个生产者集群可以发送多个 Topic 消息。发送分布式事务消息时,如果生产者中途意外宕机,Broker 会主动回调生产者集群的任意一台机器来确认事务状态。)

       消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。

       (一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。一个消费者集群对应一个 Group ID,一个 Group ID 可以订阅多个 Topic)

    三.消息中间件功能特性

        削峰填谷

         诸如秒杀、618大促、双十一等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列可提供削峰填谷的服务来解决该问题。

        异步解耦

          微服务系统架构,整体业务系统庞大而且复杂,往往采用RPC框架或负载均衡,仍有可能导致系统流量分配不均,导致应用中断业务。消息队列 RocketMQ 版可实现异步通信和应用解耦,确保业务的连续性。

        顺序收发

        日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出(First In First Out,缩写 FIFO)原理类似,消息队列 RocketMQ 版提供的顺序消息即保证消息 FIFO。顺序分为全局顺序、局部顺序。

       分布式事务一致性

         交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

    四.消息队列适用场景

       4.1 普通消息

              简单的消息发送,目前仅提供同步消息发送实现

       4.2 顺序消息

           全局顺序:适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。

           局部顺序:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照 FIFO 原则进行消息发布和消费的场景。

           使用顺序消息时,请注意以下几点:

           顺序消息暂不支持广播模式。

           建议同一个Group ID只对应一种类型的Topic,即不同时用于顺序消息和无序消息的收发。

           顺序消息不支持异步发送方式,否则将无法严格保证顺序。

           对于全局顺序消息,建议消息不要有阻塞。同时运行多个实例,是为了防止工作实例意外退出而导致业务中断。当工作实例退出时,其他实例可以立即接手工作,不会导致业务中断,实际工作的只会有一个实例。

      4.3 延时消息(消息组件暂不提供)

          消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。

         通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息。

      4.4 消费消息

         消息订阅关系一致指的是同一个消费Group ID下所有Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。由于消息队列 RocketMQ 版的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的实例需在以下两方面均保持一致:

         A、订阅的Topic必须一致

         B、订阅的Topic中的Tag必须一致

      4.5 消息幂等

        4.5.1 当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。

        4.5.2消息重复的场景如下:

           发送时消息重复

           当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

         投递时消息重复

           消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

           负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及消费者应用重启)

          当消息队列的Broker或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

       4.5.3 处理方式:

           C1:根据消息组件msgId+msgType作第一层唯一校验;

           C2:业务系统根据业务标识去重;

           消息去重的实现机制:可以采用业务DB或者缓存中间件Redis ;

    五.消息队列命名规范

        在这里,我简单介绍咱们对topic的定义规范以及消费者群组的定义规范,通过标准化的配置,利于咱们对相应的业务场景做监控

       (1)【强制】: 消息队列或者消费者组名称应该具备可读性和可管理性

       消息队列名称示例:MQ_hub2lmis_create_wo_pro(组成规则由3段组成,MQ:消息队列简称
       hub2lmis_create_wo:标识具体的业务语义。pro:标识该消息队列归属于生产环境)
    参考2.2 基本属性介绍.topic
    消费者组名称示例:CG_hub2lmis_create_wo_pro(组成规则同上)
    参考2.2 基本属性介绍.Group

    (2)【强制】:消息队列或者消费者组名称不要包含特殊字符

    反例:包含空格、换行、单双引号以及其他转义字符

    (3)【强制】: 不建议发送过大的消息 ,目前默认发送消息体大于 4MB 会被压缩,大于4MB的MQ报文数据通过文件服务存储到OSS中。

    六.消息中间件与服务治理结合

       接入服务治理的好处:

         6.1 动态启停消费者群组,不用重复的变更配置、发布应用,安全可靠;

         6.2 动态调整消费者群组监听线程,避免因线程堆积等原因导致应用进程奔溃;

         6.3 客户端简化配置参数,各方只需关注自己的业务逻辑实现即可;

    七.其他事项

       7.1 消息组件自身重试机制

          从使用RocketMQ开始,很多同事都在反馈RMQ的重试机制到底是如何的?

          RMQ是阶梯性的重试补偿,补偿频率为(10s,30s,1min,2min,3min,5min,6min,7min,8min,9min,10min,20min,30min,1h,2h),验证结果如下:

           

          此外,当应用服务重启后,消费失败的队列,会继续得到补偿,补偿频率仍为(10s,30s,1min,2min,3min,5min,6min,7min,8min,9min,10min,20min,30min,1h,2h)

    如下图:

        

    7.2 去消息仓库化   

        消息组件设计之初,引人了消息仓库(mongodb)来存储消息数据体,减轻rmq服务端的文件读写性能。随着新的技术栈迭代更新,现在已不在提供中央仓库,用来存放消息体。针对业务系统中大body,由业务系统自行将消息体拆分或者选择相应的消息存储仓库。

    7.3 消费者群组和消息队列关系一一对应

        一个消费者群组仅监听一个topic。否则在应用重启或者因某个队列异常,进而导致整个消费者群组出现消费异常。

               

    八、 最佳实践

       

       8.1消息组件架构图

        

      8.2消息组件数据交互图

       

      8.3 生产者

      8.3.1 发送消息注意事项

        Topic的使用

        一个业务使用一个Topic。

         Keys的使用

        每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消 息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是 哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

      // 订单Id
      String orderId = "20034568923546";
      message.setKeys(orderId);
    

       日志的打印

        消息发送成功或者失败要打印消息日志,务必要打印 SendResult 和 key 字段。

      8.3.2 消息发送失败处理方式

          Producer 的 send 方法本身支持内部重试,重试逻辑如下:

    • 默认重试 3 次(同步发送为 3 次,异步发送为 3 次)。
    • 如果本身向broker发送消息产生超时异常,就不会再重试。

       以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

    8.4 消费者

    8.4.1 消费过程幂等

    RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。

    8.4.2 消费速度慢的处理方式

    1. 提高消费并行度
    • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订 阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
    • 提高单个 Consumer 的消费并行线程,通过修改注解 @RocketMQMessageListener 中参数 consumeThreadMin、consumeThreadMax实现。
    1. 批量方式消费 某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer 的 consumeMessageBatchMaxSize 返个参数,默认是 1 ,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

    8.4.3 消费打印日志

    如果消息量较少,建议在消费入口方法打印消息日志,消费耗时等,方便后续排查问题。

    8.4.4 其他消费建议

    1. 关于消费者和订阅 第一件需要注意的事情是,不同的消费者组可以独立的消费一些 topic,并且每个消费者组都有自己的 消费偏移量,请确保 同一组内的每个消费者订阅信息保持一致 。
    2. 关于线程数设置 消费者使用 ThreadPoolExecutor 在内部对消息进行消费,所以你可以通过设置 consumeThreadMin 或 consumeThreadMax 来改变它。
    3. 关于消费位点 当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息 CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。 CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。

    8.4.5 订阅关系一致

    订阅关系一致指的是同一个消费者 Group 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅 关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

    背景信息

    RocketMQ 里的一个消费者 Group 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 消费者 Group 下通常会挂载多个 Consumer 实例。

    由于 RocketMQ 的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费 者 Group 下所有的实例需在以下两方面均保持一致:

    • 订阅的 Topic 必须一致
    • 订阅的 Topic 中的 Tag 必须一致

    正确订阅关系图片示例

    多个 Group 订阅了多个 Topic,并且每个 Group 里的多个消费者实例的订阅关系保持了一致。

    错误订阅关系图片示例

    单个 Group 订阅了多个 Topic,但是该 Group 里的多个消费者实例的订阅关系并没有保持一致。

    九.消息组件对接举例

       9.1.1、依赖包引入

       9.1.1.1、通用SDK(maven) 

    <dependency>

          <groupId>com.cf.tic.gsdk</groupId>

          <artifactId>g-sdk</artifactId>

          <version>2.0.8</version>

    </dependency>

    9.1.1.2、通用SDK(Gradle)

    compile 'com.cf.tic.gsdk:g-sdk:2.0.8

    9.1.1.3、消息组件依赖

    <dependency>

                <groupId>com.cf.scm.common</groupId>

            <artifactId>common-message-component-rocketmq</artifactId>

                <!--spring4.x-->

                <version>1.6.0.9</version>

    <!--spring3.x-->

    <version>1.6.1.9</version>

    <!--spring-支持分布式消息去重(redis)-->
    <version>1.6.2.9</version>

    <!-- spring3.x(wms3.0)版本-->

    <version>1.6.3.9</version>

    </dependency>

    9.1.1.4、消息组件依赖

    compile ('com.cf.scm.common:common-message-component-rocketmq:1.6.0.9')

    9.2、业务代码订阅治理信息数据

      9.2.1、剔除原有的zK通知模式(若存在即关注)

         ZkDateChangeManagerImpl类中changeData()方法删除掉,governanceService.init(mqservicePath);

      9.2.2、新增应用初始化类(若应用中已存在类似的初始化代码,可忽略)

    /**

     * 模拟应用初始化

     * @author lisi

     * @create 2020/2/27

     */

    public class ApplicationInitServiceImpl {

        @Autowired

        private CallbackService payGovernanceService;

        @PostConstruct

        public void init() {

            BeanRegistry sc = BeanRegistry.getInstance();

            CallbackService cbs = payGovernanceService;

            sc.add(CallbackService.class, cbs);

        }}

    9.2.3、新增消息队列订阅关系变更通知类

    /**

     * @author lisi

     * @create 2020/2/27

     */

    @Service("payGovernanceService")

    @Slf4j

    public class PayGovernanceServiceImpl implements CallbackService

    {

        @Autowired

        private GovernancePullServiceImpl governancePullService;

       

        @Override

        public String getCallbackName() {

            return null;

        }

        /**

         * 数据校验是否通过

         */

        public boolean validate(Map<String, Object> map) {

            //默认为false,需开发同事咨询处理

            return true;

        }

        @Override

        public void process(Map<String, Object> map) {

            Object o = map.get("CB_MAP_CONFIG_RESP");

            String text = (String) o;

            VersionQueryResp obj = JsonUtil.readValue(text, VersionQueryResp.class);

            String body = obj.getRuntime();

            Map<String,String> resultMap = JsonUtil.readValue(body, Map.class);

            String rmqBody =  resultMap.get("mq");

            if (StringUtil.isEmpty(rmqBody)) {

                log.warn("从服务端获取的数据为空:{}" , rmqBody);

                return;

            }

            governancePullService.init(rmqBody);

    }

    9.2.4、配置文件关联如上实现类

    <bean id="applicationInitService" class="com.cfx.scm.baseservice.pay.service.ApplicationInitServiceImpl"

              init-method="init">

        </bean>

        <bean id="pgClientLauncher" class="com.cfx.tic.gsdk.pg.PgClientLauncher" init-method="init">

            <property name="appId" value="示例"/>

            <property name="secret" value="示例"/>

            <property name="apiGateWayUrl" value="http://sit-api-base.cfx.com/api(测试网关地址)"/>

            <property name="enabled" value="true"/>

        </bean>

    9.5 业务逻辑

      9.5.1 发送消息

    业务类中需引入发送生产者对象

    A、@Autowired

        private RocketMQProducerServer producerServer ;

    B、发送有二级标签的消息数据

    producerServer.sendDataMsgConcurrently(topic,tags,MessageCommond) ;

    C、发送无二级标签的消息数据

    producerServer.sendDataMsgConcurrently(topic, MessageCommond) ;

    MessageCommond实体对象

    属性

    类型

    描述

    是否必填

    msgId

    String

    消息ID(由各个业务系统定义)

    msgType

    String

    消息类型(如 该消息队列的接口名称)

    isMsgBodySend

    Boolean

    消息体是否发送至mq

    true :消息体发送至mq

    false :消息体仅保存至mongodb,不发送到mq

    msgBody

    String

    消息对象

    sendTime

    String

    消息发送时间

    feedBackTime

    String

    消息接收反馈时间

    msgExt

    String

    消息描述<存放各自特殊需求的信息>

    splitId

    String

    分库分表id

     9.5.2 接收消息

            接入本版本服务治理后,业务系统无需考虑采用配置方式来处理topic分发给什么样的bean及method处理业务逻辑,因为这些配置信息提前就已经统一配置于服务治理平台。

            客户端仅关注拿到的Topic消息交由哪个beanId、method来处理数据即可

    9.6 消息仓库管理(暂停消息仓库使用)

    若应用开启消息数据保存至mongodb,则需在main esourcesMETA-INFspring中引入xml文件

    <import resource="classpath*:spring-cfx-mongo.xml"/>

    Zk中需配置如下参数

    msg.replica.set=10.88.27.120:27017,10.88.27.121:27017,10.88.27.122:27017

    msg.connections.per.host=500

    msg.threads.allowed.multiplier=100

    msg.connect.timeout=10000

    msg.max.wait.time=10000

    msg.socket.keep.alive=false

    msg.socket.timeout=10000

    msg.mongodb.username=testuser

    msg.mongodb.password=testpass

    msg.database.store=cfx_rocketmq

    msg.write.concern=SAFE

    9.7 全局参数配置

    属性名

    属性介绍

    默认值

    msg.iscrossaccess

    是否跨集群访问

    false

    cluster.networkmapping

    网络mapping映射关系

    mongodb.msg.isEnable

    Mongodb开关

    true

    msg.gzip

    消息是否开启压缩

    false

    三、注意事项

    3.1 若对接应用为消费者,需在spring.xml中配置如下:

    <!-- spring注入的是接口,关联的是实现类 [可以用实现类注入] -->

    <aop:config proxy-target-class="true" />

    解决如下异常:

    3.2 生产者、消费者在发送、解析消息数据时,建议使com.cfx.utilities.json下的JsonUtil工具类

    3.3 对接应用在引入相关附件文件时,需注意相关包名路径!!!

    3.4 各个对接应用在创建(发送)消息话题(topic、队列)时,需联系RocketMQ管理员,由管理员为话题创建队列(分区)数。

    3.5、接入新的消息组件,各个应用系统需提前在UAC系统中申请应用租户、租户密钥以及网关地址。

    3.6、消息组件的租户是以实例类型为基准,因此PAC同一套代码、同样的prod环境,但细分为MQService、DaemonService、adapter等各个应用,因此需要向UAC环境申请多个租户(切记!)

    3.7、队列(topic)创建规则:

    MQ_业务2业务_profile:以MQ开头、业务流程说明、环境标识。

    3.8、消费者群组示例:

    CG_业务2业务_profile:将监听的队列MQ替换成CG。

    四、消息组件应用技术点

    4.1 消息内容加密

    加密方式AES

    4.2 spring事务

    五、spring boot对接注意事项

    5.1 包扫描的问题

    SpringbootApplication 的上级目录至少是 com目录,如此消息组件中的注解类方可扫描到

    5.2 相关代码附件

    5.3 禁用springBoot默认mongodb加载

    @SpringBootApplication(exclude = {MongoAutoConfiguration.class,MongoDataAutoConfiguration.class})

    九、落地:如何让规范更好落地

    1. 对于强制规范比如命名规则可以在公共RMQ客户端SDK API中进行校验;

    2. 对于建议规则,由各个业务组架构师进行宣导,小组开发负责人做代码review时根据建议规则要求所属开发整改;

  • 相关阅读:
    Win32 开发
    Corners in C#
    swfupload在IE8下显示正常,但是单击添加按钮无反应
    Windows Script Host(WSH)
    研磨设计模式 之 中介者模式(Mediator)
    Pure GPU Computing Platform : NVIDIA CUDA Tutorial
    BattleField 2142引擎图形程序员小访谈
    利用SAH实现kD树快速分割模型实践
    给大家看一下德国的家居装潢技术,在装修房子的朋友可以欣赏一下
    Python与Microsoft Office自动化操作
  • 原文地址:https://www.cnblogs.com/liran123/p/13803872.html
Copyright © 2011-2022 走看看