zoukankan      html  css  js  c++  java
  • RocketMQ笔记

    https://github.com/apache/rocketmq/tree/master/docs/cn

    基本概念

    1.1.1.RocketMQ有三部分组成:produce,broker,consumer
    1.1.2.RocketMQ多种发送方式,同步发送、异步发送、顺序发送、单向发送。同异步需要返回确认信息
    1.1.3.两种消费形式:拉取式消费(主动性)、推动式消费(实时性)。
    1.1.4.代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
    1.1.5.两种消息模式:集群消费和广播消费。
    1.1.6.消息队列中的对象Massage通用的四个变量String topic(主题,消息的集合,一般是请求项目的标识), String tags(消息的任务类型), String keys(唯一标识), byte[] body
    特性
    1.2.1.消息顺序:全局顺序与分区顺序,都是指定某个topic执行FIFO,但是后者是按照sharding key来作为分区,分区里面的必须顺序,类似于某个订单key的创建,付款,完成
    1.2.2.消息过滤在broker实现,减少不必要的输出,但是增加了负担
    1.2.3.对于要求高可靠的地方,可以使用3.0版本开始的同步双写(影响性能),broker非正常关闭和异常Crash,OS Crash,机器暂时掉电,机器无法开机,磁盘损坏,前者没必要,后两个的在高要求下需要同步写。
    1.2.4.可回溯消费
    1.2.5.支持事务
    1.2.6.定时消息,一段时间后执行。
    1.2.7.生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。
    1.2.8.控流
    1.2.9.重试最大次会将消息放到死信队列中


    架构设计
    2.1.1.四部分:Producer,Consumer,NameServer(Topic路由注册中心,broker管理和心跳检测机制,集群部署,互不通信),BrokerServer
    2.1.2.重点理解这句话 消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
    2.1.3.启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
    Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。
    心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。
    设计
    2.2.1.消息存储(最复杂和重要的):RocketMQ的消息存储整体架构、PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式
    RocketMQ的消息存储整体架构
    1>三个文件:CommitLog(消息主体以及元数据的存储主体):起始偏移量是按照之前文件大小决定的
    ConsumeQueue
    IndexFile
    2>对文件使用页缓存,利用NIO中的FileChannel
    3>消息刷盘,同步的可靠性和异步的性能性
    2.2.2.通信机制:rocketmq-remoting网络通信模块,消息队列自定义了通信协议,在Netty基础上扩展。
    RemotingCommand类对所有数据进行封装
    通信方式:同步(sync)、异步(async)、单向(oneway)
    RPC通信采用Netty组件作为底层通信库,Reactor多线程模型,在这之上做了一些扩展和优化。
    这里的通信要涉及到了Netty和多线程 ----> https://github.com/apache/rocketmq/blob/master/docs/cn/design.md
    2.2.3.过滤方式: Tag、SQL92
    2.2.4.总体是生产和消费的负载均衡
    1>Producer端的sendLatencyFaultEnable开关实现消息发送的高可用
    2>Consumer端Push消费模式是对Pull的一种封装
    负载均衡的核心类RebalanceImpl,核心方法rebalanceByTopic(),过程需要重点理解
    2.2.5.事务默认回查15次
    2.2.6.消息查询可以按照MessageId,Message Key,前者将请求的地址封装成一个RPC请求,后者靠IndexFile索引文件。


    样例


    最佳实践
    4.1.1.生产者:
    1.一个应用尽可能用一个Topic,消息子类型则可以用tags来标识。
    2.keys的使用:每个消息在业务层面的唯一标识码(避免冲突)要设置到keys字段,方便将来定位消息丢失问题。
    3.producer在send后用SendResult接受结果,四种结果
    4.可靠性要求不高的情况可以使用oneway形式发送
    4.1.2.消费者:
    1.消费过程幂等,MQ无法避免重复消费,可以使用msgId的方式,但是消费者的主动重发和客户端的重投机制会影响msgId的唯一性
    2.消费慢的处理方式:
    1>合理提高消费的并行度:
    •同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
    •提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。
    2>批量消费:通过设置 consumer的 consumeMessageBatchMaxSize
    3>跳过非重要消息:丢弃不重要信息
    4>优化每条消息消费过程:
    •减少数据库交互;
    •对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的ReturnTime会小很多。
    3.打印消费日志
    4.消费建议:
    1>不同的消费者组可独立的消费一些topic,且每个消费者组都有自己的消费偏移量,请确保同一组内的每个消费者订阅信息保持一致。
    2>有序消息可使用ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT的状态代替异常。(不可跳过稍等时间)
    3>并发消费可使用ConsumeConcurrentlyStatus.RECONSUME_LATER的状态代替异常。(可跳过并稍后消费)
    4>不建议阻塞监听器,会阻塞线程池,并最终可能会终止消费进程
    5>消费者使用ThreadPoolExecutor 消费时,可以使用setConsumeThreadMin,setConsumeThreadMax来修改
    6>消费位点:建立新的消费组的时候,CONSUME_FROM_LAST_OFFSET将会忽略历史消息,CONSUME_FROM_FIRST_OFFSET将会消费Broker的所有消息,CONSUME_FROM_TIMESTAMP可以指定时间戳后。
    4.1.3.Broker
    1.Broker分为ASYNC_MASTER、SYNC_MASTER和SLAVE,测试的时候可以不用SLAVE
    2.FlushDiskType中的SYNC_FLUSH比ASYNC_FLUSH更可靠,但是会损失性能
    3.Broker配置中的参数https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md#33-broker-%E9%85%8D%E7%BD%AE
    4.1.4.NameServer
    1.Brokers定期向每个NameServer注册路由数据
    2.NameServer为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。
    4.1.5.客户端(生产者和消费者)
    1.客户端寻址方式,最前面的最优先,推荐最后一种,部署简单,集群可以热升级
    •代码中指定Name Server地址,多个namesrv地址之间用分号分割
    producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
    consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
    •Java启动参数中指定Name Server地址
    -Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
    •环境变量指定Name Server地址
    export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
    •HTTP静态服务器寻址(默认):访问http://jmenv.tbsite.net:8080/rocketmq/nsaddr返回地址,可以在/etc/hosts修改要访问的服务器
    10.232.22.67 jmenv.taobao.net
    2.客户端配置:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md#52-%E5%AE%A2%E6%88%B7%E7%AB%AF%E9%85%8D%E7%BD%AE
    其中包括客户端的公共配置、Producer配置、PushConsumer配置、PullConsumer配置、Message数据结构
    4.1.6.系统配置
    1.JVM选项
    当前推荐JDK1.8,-server -Xms8g -Xmx8g -Xmn4g,相同的Xms、Xmx能获得更好的性能
    不关心Broker启动时间的人可以启用它:​
    -XX:+AlwaysPreTouch
    禁用偏置锁定可能会减少JVM暂停,​
    -XX:-UseBiasedLocking
    至于垃圾回收,建议使用带JDK 1.8的G1收集器。
    -XX:+UseG1GC -XX:G1HeapRegionSize=16m
    -XX:G1ReservePercent=25
    -XX:InitiatingHeapOccupancyPercent=30
    它在我们的生产环境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件:
    -XX:+UseGCLogFileRotation
    -XX:NumberOfGCLogFiles=5
    -XX:GCLogFileSize=30m
    如果写入GC文件会增加代理的延迟,可以考虑将GC日志文件重定向到内存文件系统:
    -Xloggc:/dev/shm/mq_gc_%p.log123
    2 Linux内核参数
    •os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。下面的参数需要注意,更多细节请参考/proc/sys/vm/*的文档
    •vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
    •vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
    •vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness)
    •vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
    •File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。
    •Disk scheduler,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。
    消息轨迹指南:
    4.2.1.关键属性
    1.Producer端:生产实例信息、发送消息时间、消息是否发送成功、发送耗时
    2.Consumer端:消费实例信息、投递时间,投递轮次、消息是否消费成功、消费耗时
    3.Broker端:消息的Topic、消息存储位置、消息的Key值、消息的Tag值
    4.2.2.支持消息轨迹集群部署
    1.Broker端配置文件:
    brokerClusterName=DefaultCluster
    brokerName=broker-a
    brokerId=0
    deleteWhen=04
    fileReservedTime=48
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    storePathRootDir=/data/rocketmq/rootdir-a-m
    storePathCommitLog=/data/rocketmq/commitlog-a-m
    autoCreateSubscriptionGroup=true
    ## if msg tracing is open,the flag will be true
    traceTopicEnable=true
    listenPort=10911
    brokerIP1=XX.XX.XX.XX1
    namesrvAddr=XX.XX.XX.XX:9876
    2.普通模式:Broker节点均用于存储消息轨迹数据,节点数量无要求和限制
    3.物理IO隔离模式:数据量大时,可在MQ集群中选择一个Broker节点专用于存储消息轨迹
    4.启动开启消息轨迹的Broker:nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &
    4.2.3.保存消息轨迹的Topic定义:
    1.系统级的TraceTopic:Topic在Broker启动时会自动创建,需要Broker配置中traceTopicEnable设置为true,名称为RMQ_SYS_TRACE_TOPIC
    2.用户自定义的TraceTopic
    4.2.4.支持消息轨迹的Client客户端
    开关参数(enableMsgTrace)来实现消息轨迹是否开启
    自定义参(customizedTraceTopic)数来实现用户存储消息轨迹数据至自己创建的用户级Topic
    1.发送消息时开启消息轨迹
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
    producer.setNamesrvAddr("XX.XX.XX.XX1");
    producer.start();
    try {
    {
    Message msg = new Message("TopicTest",
    "TagA",
    "OrderID188",
    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
    }

    } catch (Exception e) {
    e.printStackTrace();
    }
    2.订阅消息时开启消息轨迹
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
    consumer.subscribe("TopicTest", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.setConsumeTimestamp("20181109221800");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
    3.支持自定义存储消息轨迹Topic:将DefaultMQProducer和DefaultMQPushConsumer实例的初始化修改
    ##其中Topic_test11111需要用户自己预先创建,来保存消息轨迹;
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
    ......

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
    ......
    权限控制
    4.3.1.用户访问控制:在Client客户端通过 RPCHook注入AccessKey和SecretKey签名
    -->将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在distribution/conf/plain_acl.yml的配置文件中
    -->Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。
    4.3.2.权限控制的定义与属性值:DENY(拒绝)、ANY(PUB或者SUB权限)、PUB(发送权限)、SUB(订阅权限)
    4.3.3.关键配置文件:distribution/conf/plain_acl.yml
    4.3.4.Broker端开启ACL特性的properties配置文件内容:
    brokerClusterName=DefaultCluster
    brokerName=broker-a
    brokerId=0
    deleteWhen=04
    fileReservedTime=48
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    storePathRootDir=/data/rocketmq/rootdir-a-m
    storePathCommitLog=/data/rocketmq/commitlog-a-m
    autoCreateSubscriptionGroup=true
    ## if acl is open,the flag will be true
    aclEnable=true
    listenPort=10911
    brokerIP1=XX.XX.XX.XX1
    namesrvAddr=XX.XX.XX.XX:9876
    4.3.5.主要流程:权限解析+权限校验
    解析:
    AccessKey:类似于用户名
    Signature:根据 SecretKey 签名得到的串
    校验:
    1.全局白名单
    2.用户白名单
    3.签名
    4.所请求的权限和所拥有的
    5.特殊: 1)特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;
    2)对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限;
    4.3.6.权限控制热加载:存储的默认实现是基于yml配置文件,不需重新启动Broker服务节点。
    4.3.7.权限控制的使用限制:
    (1)如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的distribution/conf/plain_acl.yml配置文件中 设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中。
    (2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组 内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。
    4.3.8.ACL mqadmin配置管理命令
    Dledger快速搭建:自动容灾切换
    4.4.1.先构建 DLedger,然后构建RocketMQ
    1.构建 DLedger
    git clone https://github.com/openmessaging/openmessaging-storage-dledger.git
    cd openmessaging-storage-dledger
    mvn clean install -DskipTests
    2.构建 RocketMQ
    git clone https://github.com/apache/rocketmq.git
    cd rocketmq
    git checkout -b store_with_dledger origin/store_with_dledger
    mvn -Prelease-all -DskipTests clean install -U
    4.4.2.快速部署
    1.在构建成功后
    cd distribution/target/apache-rocketmq
    sh bin/dledger/fast-try.sh start
    如果上面的步骤执行成功,可以通过 mqadmin 运维命令查看集群状态。
    sh bin/mqadmin clusterList -n 127.0.0.1:9876
    顺利的话,会看到内容(BID 为 0 的表示 Master,其余都是 Follower)
    2.关闭快速集群,可以执行:
    sh bin/dledger/fast-try.sh stop
    3. 容灾切换
    部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。
    Dledger集群搭建:相同名称的Broker,至少三个,配置可参考conf/dledger
    4.5.1.新集群部署:
    1.可自动容灾切换,也可水平扩展
    2.conf/dledger/broker-n0.conf 的配置举例
    brokerClusterName = RaftCluster
    brokerName=RaftNode00
    listenPort=30911
    namesrvAddr=127.0.0.1:9876
    storePathRootDir=/tmp/rmqstore/node00
    storePathCommitLog=/tmp/rmqstore/node00/commitlog
    enableDLegerCommitLog=true
    dLegerGroup=RaftNode00
    dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
    ## must be unique
    dLegerSelfId=n0
    sendMessageThreadPoolNums=16
    3.启动:
    nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf &
    nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf &
    nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf &
    4.5.2.旧集群升级:采用Master或者Master-Slave方式部署,每个都需要转换成集群
    1.杀掉旧的 Broker:kill命令或者bin/mqshutdown broker
    2.确保Commitlog前后一致
    3.修改配置和重新启动Broker和新集群的相同


    运维管理

    介绍单Master模式、多Master模式、多Master多slave模式等RocketMQ集群各种形式的部署方法以及运维工具mqadmin的使用方式。
    https://github.com/apache/rocketmq/blob/master/docs/cn/operation.md

  • 相关阅读:
    获取SQLSERVER所有库 所有表 所有列 所有字段信息
    无法嵌入互操作类型,请改用适用的接口 的解决方法
    注册Com组件..
    IIS站点无法访问..点浏览IIS窗口直接关掉
    数据库日志文件的收缩
    由于目标机器积极拒绝,无法连接。
    Log4Net使用方法
    WindowsService 创建.安装.部署
    蓝桥杯题库基础练习字母图形
    修改IDEA默认模板
  • 原文地址:https://www.cnblogs.com/cambra/p/13716293.html
Copyright © 2011-2022 走看看