zoukankan      html  css  js  c++  java
  • RocketMQ概览

    核心概念

    Topic:消息主题,一级消息类型,通过Topic对消息进行分类。

    Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。

    Message ID:消息的全局唯一标识,由消息队列RocketMQ版系统自动生成,唯一标识某条消息。

    Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。

    Producer:消息生产者,也称为消息发布者,负责生产并发送消息。

    Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:

    • Push Consumer:消息由消息队列RocketMQ版推送至Consumer。
    • Pull Consumer:该类Consumer主动从消息队列RocketMQ版拉取消息。目前仅TCP Java SDK支持该类Consumer。

    Group:一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

    集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。

                     例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。

    广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。

                     例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。

    消息收发模型

    消息队列RocketMQ版支持发布和订阅模型,消息生产者应用创建Topic并将消息发送到Topic。消费者应用创建对Topic的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。

    生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。

    一个生产者集群可以发送多个Topic消息。发送分布式事务消息时,如果生产者中途意外宕机,消息队列RocketMQ版服务端会主动回调生产者集群的任意一台机器来确认事务状态。

     

    消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。

    一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。

    一个消费者集群对应一个Group ID,一个Group ID可以订阅多个Topic

    集群消费和广播消费

    消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:

    集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。
    集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
    广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

    集群消费模式

    适用场景
    适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。

    注意事项

    • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
    • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

    广播消费模式

    适用场景
    适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。

    注意事项

    • 广播消费模式下不支持顺序消息。
    • 广播消费模式下不支持重置消费位点。
    • 每条消息都需要被相同订阅逻辑的多台机器处理。
    • 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
    • 广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
    • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
    • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
    • 广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

    系统部署架构

    图中所涉及到的概念如下所述:

    • Name Server:是一个几乎无状态节点,可集群部署,在消息队列RocketMQ版中提供命名服务,更新和发现Broker服务。
    • Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息。
    • 生产者:与Name Server集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长链接,且定时向Master Broker发送心跳。
    • 消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。

    消息类型

      

    普通消息

    消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

    定时和延时消息

    允许消息生产者对指定消息进行定时(延时)投递,最长支持40天。

    顺序消息

    允许消息消费者按照消息发送的顺序对消息进行消费。

    事务消息

    实现类似X或Open XA的分布事务功能,以达到事务最终一致性状态。

    普通消息是指消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

    定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。

    延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

    顺序消息(FIFO消息)是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。

    顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

    • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景

    • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

    对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

    • 用户注册需要发送发验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
    • 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
    producer.start();
            for (int i = 0; i < 1000; i++) {
                String orderId = "biz_" + i % 10;
                Message msg = new Message(
                        // Message所属的Topic。
                        "Order_global_topic",
                        // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                        "TagA",
                        // Message Body,可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                        "send order global msg".getBytes()
                );
                // 设置代表消息的业务关键属性,请尽可能全局唯一。
                // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
                // 注意:不设置也不会影响消息正常收发。
                msg.setKey(orderId);
                // 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
                // 全局顺序消息,该字段可以设置为任意非空字符串。
                String shardingKey = String.valueOf(orderId);
                try {
                    SendResult sendResult = producer.send(msg, shardingKey);
                    // 发送消息,只要不抛异常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                }
                catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }
            // 在应用退出前,销毁Producer对象。
            // 注意:如果不销毁也没有问题。
     producer.shutdown();

    事务消息适用于所有对数据最终一致性有强需求的场景

    • 事务消息:消息队列RocketMQ版提供类似X或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
    • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
    • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

    事务消息发送步骤如下:

    1. 发送方将半事务消息发送至消息队列RocketMQ版服务端。
    2. 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
    3. 发送方开始执行本地事务逻辑。
    4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

    事务消息回查步骤如下:

    1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
    2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

    RocketMQ最佳实践


    Topic与Tag

    Topic和Tag的关系如下图

    到底什么时候该用Topic,什么时候该用Tag?

    建议从以下几个方面进行判断:

    • 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的Topic,无法通过Tag进行区分。
    • 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的Topic进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分。
    • 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分。
    • 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的Topic。

    总的来说,针对消息分类,您可以选择创建多个Topic,或者在同一个Topic下创建多个Tag。但通常情况下,不同的Topic之间的消息没有必然的联系,而Tag则用来区分同一个Topic下相互关联的消息,例如全集和子集的关系、流程先后的关系。

     

    消息幂等

    当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。

    处理方法

    因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。

    以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:

    Message message = new Message();
    message.setKey("ORDERID_100");
    SendResult sendResult = producer.send(message);     

    消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等:

    consumer.subscribe("ons_test", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            String key = message.getKey()
            // 根据业务唯一标识的Key做幂等处理。
        }
    }); 

    订阅关系一致

    订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Group ID、Tag必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

    消息队列RocketMQ版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。

    由于消息队列RocketMQ版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的实例需在以下两方面均保持一致:

    • 订阅的Topic必须一致
    • 订阅的Topic中的Tag必须一致(包括Tag的数量和Tag的顺序)

    正确订阅关系

    错误订阅关系图片

    消息堆积和延迟问题

    消息处理流程中,如果客户端的消费速度跟不上服务端的发送速度,未处理的消息会越来越多,这部分消息就被称为堆积消息。消息出现堆积进而会造成消息消费延迟。以下场景需要重点关注消息堆积和延迟的问题:

    • 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
    • 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消息延迟也无法接受。

    客户端消费原理

    通过以上客户端消费原理可以看出,消息堆积的主要瓶颈在于本地客户端的消费能力,即消费耗时和消费并发度。想要避免和解决消息堆积问题,必须合理的控制消费耗时和消息并发度,其中消费耗时的优先级高于消费并发度,必须先保证消费耗时的合理性,再考虑消费并发度问题。

     

    消费耗时

    影响消费耗时的消费逻辑主要分为CPU内存计算和外部I/O操作,通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部I/O操作来说几乎可以忽略。外部I/O操作通常包括如下业务逻辑:

    • 读写外部数据库,例如Mysql数据库读写。
    • 读写外部缓存等系统,例如Redis读写。
    • 下游系统调用,例如Dubbo调用或者下游HTTP接口调用。

    这类外部调用的逻辑和系统容量您需要提前梳理,掌握每个调用操作预期的耗时,这样才能判断消费逻辑中I/O操作的耗时是否合理。通常消费堆积都是由于这些下游系统出现了服务异常、容量限制导致的消费耗时增加。

    例如:某业务消费逻辑中需要写一条数据到数据库,单次消费耗时为1 ms,平时消息量小未出现异常。业务侧进行大促活动时,写数据库TPS爆发式增长,并很快达到数据库容量限制,导致消费单条消息的耗时增加到100 ms,业务侧可以明显感受到消费速度大幅下跌。此时仅通过调整消息队列RocketMQ版SDK的消费并发度并不能解决问题,需要对数据库容量进行升配才能从根本上提高客户端消费能力。

     

    消费并发度

    RocketMQ并发度计算方法如下表所示。

    消息类型消费并发度
    普通消息 单节点线程数*节点数量

    定时和延时消息
    事务消息
    顺序消息 Min(单节点线程数*节点数量,分区数)

    客户端消费并发度由单节点线程数和节点数量共同决定,一般情况下需要优先调整单节点的线程数,若单机硬件资源达到上限,则必须通过扩容节点来提高消费并发度。

    如何避免消息堆积和延迟

    为了避免在业务使用时出现非预期的消息堆积和延迟问题,您需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。

    • 梳理消息的消费耗时
      通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息:
      • 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
      • 消息消费逻辑中的I/O操作(如:外部调用、读写存储等)是否是必须的,能否用本地缓存等方案规避。
      • 消费逻辑中的复杂耗时的操作是否可以做异步化处理,如果可以是否会造成逻辑错乱(消费完成但异步操作未完成)。
    • 设置消息的消费并发度
      1. 逐步调大线程的单个节点的线程数,并关测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。
      2. 得到单个节点的最优线程数和消息吞吐量后,根据上下游链路的流量峰值计算出需要设置的节点数,节点数=流量峰值/单线程消息吞吐量。
    1. 提高消费并行度
    • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订 阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
    • 提高单个 Consumer 的消费并行线程,通过修改注解 @RocketMQMessageListener 中参数 consumeThreadMin、consumeThreadMax实现。
    1. 批量方式消费 某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer 的 consumeMessageBatchMaxSize 返个参数,默认是 1 ,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

    如何处理消息堆积

    确认消息的消费耗时是否合理。

    • 若查看到消费耗时较长,则需要查看客户端堆栈信息排查具体业务逻辑。
    • 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。

    查看客户端堆栈信息。只需要关注线程名为ConsumeMessageThread的线程,这些都是业务消费消息的逻辑。

    常见的异常堆栈信息如下:

    示例一:空闲无堆积的堆栈。

    消费空闲情况下消费线程都会处于WAITING状态等待从消费任务队里中获取消息。

    示例二:消费逻辑有抢锁休眠等待等情况。
    消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。

    示例三:消费逻辑操作数据库等外部存储卡住。
    消费线程阻塞在外部的HTTP调用上,导致消费缓慢。

    针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以丢弃,您可以通过重置消费位点跳过这些堆积的消息做到快速恢复。

    Key的使用

    每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消 息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是 哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

      // 订单Id
      String orderId = "20034568923546";
      message.setKeys(orderId);

    规范&特性

    Group

    • Group ID必须以“GID_”或者“GID-”开头,长度限制为7~64个字符,只能包含英文、数字、短横线(-)以及下划线(_)。
    • 消费者必须有对应的Group ID,生产者不做强制要求。

    Topic

    • 同一实例下Topic名称必须唯一,不同实例间的Topic名称可以重复。
    • Topic名称长度限制为3~64个字符,只能包含英文、数字、短横线(-)以及下划线(_)。

    死信消息

    • 不会再被消费者正常消费。
    • 有效期与正常消息相同,均为3天,3天后会被自动删除。因此,请在死信消息产生后的3天内及时处理。

    消息

    • 普通和顺序消息:4 MB
    • 事务和定时或延时消息:64 KB
    • 消息最多保留3天,超过时间将自动滚动删除。
    • 支持重置消费3天之内任何时间点的消息。
    • 定时和延时消息的延时时长可设置40天内的任何时刻,超过40天消息发送将失败。

    支持的队列数

    • RocketMQ 单机支持最高 5 万个队列,性能稳定。

    重试

         Producer 的 send 方法本身支持内部重试,重试逻辑如下:

    • 默认重试 3 次(同步发送为 3 次,异步发送为 3 次)。
    • 如果本身向broker发送消息产生超时异常,就不会再重试。
  • 相关阅读:
    《剑指offer》JavaScript版(4-6题)
    HDU 4906 Our happy ending(2014 Multi-University Training Contest 4)
    POJ 1436 Horizontally Visible Segments
    FOJ 2105 Digits Count
    HDU 4890 One to Four(2014 Multi-University Training Contest 3)
    HDU 4888 Redraw Beautiful Drawings(2014 Multi-University Training Contest 3)
    HDU 4893 Wow! Such Sequence!(2014 Multi-University Training Contest 3)
    POJ 3225 Help with Intervals
    HDU 1698 Just a Hook
    POJ 2886 Who Gets the Most Candies?
  • 原文地址:https://www.cnblogs.com/erichi101/p/14666135.html
Copyright © 2011-2022 走看看