zoukankan      html  css  js  c++  java
  • 消息中间件——rocketmq环境配置

    产生原因

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    RocketMQ概述

    RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息订阅机制 亿级消息堆积能力

    RocketMQ包含的组件

    NameServer:单点,供Producer和Consumer获取Broker地址
    Producer:产生并发送消息
    Consumer:接受并消费消息
    Broker:消息暂存,消息转发

    在这里插入图片描述

    • Name Server
      Name Server是RocketMQ的寻址服务。用于把Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。
      Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。
      对于一个Name Server集群列表,客户端连接Name Server的时候,只会选择随机连接一个结点,以做到负载均衡。
      Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。
      如果中途所有Name Server全都挂了,影响到路由信息的更新,不会影响和Broker的通信。

    • Broker
      Broker是处理消息存储,转发等处理的服务器。
      Broker以group分开,每个group只允许一个master,若干个slave。
      只有master才能进行写入操作,slave不允许。
      slave从master中同步数据。同步策略取决于master的配置,可以采用同步双写,异步复制两种。
      客户端消费可以从master和slave消费。在默认情况下,消费者都从master消费,在master挂后,客户端由于从Name Server中感知到Broker挂机,就会从slave消费。
      Broker向所有的NameServer结点建立长连接,注册Topic信息。

    1.强调集群无单点,可扩展
    2.任意一点高可用,水平可扩展
    3.海量消息堆积能力,消息堆积后,写入低延迟。
    4.支持上万个队列
    5.消息失败重试机制
    6.消息可查询
    7.开源社区活跃
    8.成熟度(经过双十一考验)

    队列特点

    1、异步,无需等待
    在这里插入图片描述
    2.解耦,方便系统的扩展

    如果没有消息队列,每当一个新的业务接入,我们都要在主系统调用新接口、或者当我们取消某些业务,我们也得在主系统删除某些接口调用。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,接下来收到消息如何处理,是下游的事情,无疑极大地减少了开发和联调的工作量。

    3、削峰,高并发流量
    在这里插入图片描述

    RocketMQ中的消息模型

    在这里插入图片描述

    • Producer Group 生产者组:代表某一类的生产者,比如我们有多个秒杀系统作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们一般生产相同的消息。
    • Consumer Group 消费者组:代表某一类的消费者,比如我们有多个短信系统作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们一般消费相同的消息。
    • Topic 主题:代表一类消息,比如订单消息,物流消息等等。

    你可以看到图中生产者组中的生产者会向主题发送消息,而 主题中存在多个队列,生产者每次生产消息之后是指定主题中的某个队列发送消息的。
    每个主题中都有多个队列(这里还不涉及到 Broker),集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1 和 Consumer2 分别对应着两个队列,而 Consuer3 是没有队列对应的,所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同 。

    每个消费组在每个队列上维护一个消费位置 ,为什么呢?

    因为我们刚刚画的仅仅是一个消费者组,我们知道在发布订阅模式中一般会涉及到多个消费者组,而每个消费者组在每个队列中的消费位置都是不同的。如果此时有多个消费者组,那么消息被一个消费者组消费完之后是不会删除的(因为其它消费者组也需要呀),它仅仅是为每个消费者组维护一个 消费位移(offset) ,每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现刚刚消费过的消息再一次被消费了。

    为什么使用rocketmq

    • 是否支持分布式

    activemq支持集群(结合zk),但不支持分布式
    rocketmq分布式消息中间件,支持集群(主备)、并发量高

    • mq消息堆积,会不会发生宕机

    消费者不会宕机,因为存在缓存消息机制
    消息中间件可能会宕机
    而rocket支持海量消息堆积,支持上万个队列

    • 消息中间件集群策略

    使用的不是主从策略,而是均摊测试,提高消息并发量

    rocketmq原理

    名称功能
    nameServer 存放生产者、消费者投递信息
    Broker 消息缓存
    Producer 生产者
    Consumer 消费者

    在这里插入图片描述

    • NameServer:不知道你们有没有接触过 ZooKeeper 和 Spring Cloud 中的 Eureka ,它其实也是一个 注册中心 ,主要提供两个功能:Broker管理 和 路由信息管理 。说白了就是 Broker 会将自己的信息注册到 NameServer 中,此时 NameServer 就存放了很多 Broker 的信息(Broker的路由表),消费者和生产者就从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)。

    • Producer:消息发布的角色,支持分布式集群方式部署。说白了就是生产者。

    • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者。

    在这里插入图片描述

    在这里插入图片描述

    • 第一、我们的 Broker 做了集群并且还进行了主从部署 ,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该Broker 上的消息读写都会受到影响。所以 Rocketmq 提供了 master/slave 的结构,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息 (后面我还会提到哦)。

    • 第二、为了保证 HA ,我们的 NameServer 也做了集群部署,但是请注意它是 去中心化 的。也就意味着它没有主节点,你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过 单个Broker和所有NameServer保持长连接 ,并且在每隔30秒 Broker 会向所有 Nameserver 发送心跳,心跳包含了自身的 Topic 配置信息,这个步骤就对应这上面的 Routing Info 。

    • 第三、在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过 轮询 的方法去向每个队列中生产数据以达到 负载均衡 的效果。

    • 第四、消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给 同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者。

    rocketmq集群搭建

    1.7jdk以上,jdk必须64位
    配置jvm参数

    1、安装maven

    镜像地址:
    https://mirrors.cnnic.cn/apache/maven
    
    wget https://mirrors.cnnic.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
    
    解压:
    tar -zxvf apache-maven-3.6.3-bin.tar.gz
    
    在文件末尾追加环境变量:
    vi /etc/profile:
    MAVEN_HOME=/usr/local/apache-maven-3.6.3
    export MAVEN_HOME
    export PATH=${PATH}:${MAVEN_HOME}/bin
    
    让配置文件立刻生效:
    source /etc/profile
    
    验证:
    mvn -v
    

    2、安装rocketmq

    wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.2.0.tar.gz
    
    tar -zvxf rocketmq-all-4.2.0.tar.gz
    
    进入解压目录:
    mvn -Prelease-all -DskipTests clean install -U
    
    
    cd distribution/target/apache-rocketmq/
    pwd "path"
    
    vim /etc/profile
    export rocketmq="path"
    export PATH=$PATH:$rocketmq/bin
    
    source /etc/profile
    
    
    mkdir /usr/local/log/rocketmqlogs
    
    
    启动nameServer:
    nohup mqnamesrv >/usr/local/log/rocketmqlogs/namesrv.log 2>&1 &  
    tail -f /user/local/log/rocketmqlogs/namesrv.log
    
    启动broker:
    nohup mqbroker -n localhost:9876 >/usr/local/log/rocketmqlogs/broker.log 2>&1 &   
    tail -f /user/local/log/rocketmqlogs/broker.log
    
    
    修改默认RocketMQ内存:
    bin目录下的runserver.sh和runbroker.sh文件
    JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn128m"
    
    关闭:
    mqshutdown namesrv
    mqshutdown broker
    
    可视化管理控制台RocketMQ Console
    

    2、安装rocketmq(可快速安装)

    官网:
    http://rocketmq.apache.org/dowloading/releases/
    wget http://mirrors.hust.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip
    
    unzip rocketmq-all-4.7.0-bin-release.zip
    
    mv rocketmq-all-4.7.0-bin-release /usr/local/rocketmq 
    
    cd /usr/local/rocketmq/bin
    
    修改内存参数:
    vi runserver.sh、runbroker.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn128m"
    
    
    启动nameServer:
    cd /usr/local/rocketmq/bin
    nohup sh mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &
    tail -f /usr/local/rocketmq/logs/mqnamesrv.log
    
    启动broker:
    cd /usr/local/rocketmq/bin
    nohup sh mqbroker -n localhost:9876 >/usr/local/rocketmq/logs/mqbroker.log 2>&1 & 
    tail -f /usr/local/rocketmq/logs/mqbroker.log
    

    安装可视化插件

    1、下载包

    下载:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0

    2、修改配置

    application.properties:
    在这里插入图片描述
    注释掉plugin,否则会报错:
    在这里插入图片描述

    3、编译打包

    mvn clean package -Dmaven.test.skip=true
    生成target目录,启动:
    上传到linux服务器:
    java -jar rocketmq-console-ng-1.0.0.jar
    如果配置文件没有填写Name Server
    java -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr=‘10.0.74.198:9876;10.0.74.199:9876’

    双主模式master搭建

    1、配置hosts

    vi /etc/hosts
    192.168.70.11 rocketmq-nameserver1
    192.168.70.11 rocketmq-master1
    192.168.70.12 rocketmq-nameserver2
    192.168.70.12 rocketmq-master2
    
    重启host服务(centos8):
    systemctl start NetworkManager
    systemctl restart systemd-hostnamed
    测试:
    ping rocketmq-nameserver1
    

    2、创建存储路径

    mkdir /usr/local/rocketmq/store
    mkdir /usr/local/rocketmq/store/commitlog
    mkdir /usr/local/rocketmq/store/consumequeue
    mkdir /usr/local/rocketmq/store/index
    

    3、RocketMQ配置文件

    • vim conf/2m-noslave/broker-a.properties
    • vim conf/2m-noslave/broker-b.properties
    broker-aproperties配置(b类似):
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=ASYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    
    

    4、修改日志配置文件:

    mkdir -p /usr/local/rocketmq/logs
    cd /usr/local/rocketmq/conf && sed -i ‘s#${user.home}#/usr/local/rocketmq#g’ *.xml

    5、启动服务

    关闭防火墙:
    systemctl stop firewalld.service
    
    启动NameServer:
    cd /usr/local/rocketmq/bin
    nohup sh mqnamesrv &
    
    启动broker-aroker-b:
    cd /usr/local/rocketmq/bin
    nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
    netstat -ntlp
    jps
    tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
    tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
    

    注意事项

    服务器a:
    nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/
    broker-a.properties >/dev/null 2>&1 &
    
    服务器b:
    nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/
    broker-b.properties >/dev/null 2>&1 &
    
    

    在这里插入图片描述
    参考:
    Linux下RocketMQ下载安装教程
    RcoketMq集群安装和RocketMQ

    
    public class Producer {
    
    	public static void main(String[] args) throws MQClientException {
    		DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    		producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
    		producer.setInstanceName("producer");
    		producer.start();
    		try {
    			for (int i = 0; i < 10; i++) {
    				Thread.sleep(1000); // 每秒发送一次MQ
    				Message msg = new Message("link-topic", // topic 主题名称
    						"TagA", // tag 临时值
    						("link-"+i).getBytes()// body 内容
    				);
    				SendResult sendResult = producer.send(msg);
    				System.out.println(sendResult.toString());
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		producer.shutdown();
    	}
    
    }
    
    消费者
    
    public class Consumer {
    	public static void main(String[] args) throws MQClientException {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    
    		consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
    		consumer.setInstanceName("consumer");
    		consumer.subscribe("link-topic", "TagA");
    
    		consumer.registerMessageListener(new MessageListenerConcurrently() {
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    				for (MessageExt msg : msgs) {
    					System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
    				}
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});
    		consumer.start();
    		System.out.println("Consumer Started.");
    	}
    }
    
    

    RocketMQ重试机制

    MQ 消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。
    MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

    public class Consumer {
    	public static void main(String[] args) throws MQClientException {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    	consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
    		consumer.setInstanceName("consumer");
    		consumer.subscribe("link-topic", "TagA");
    
    		consumer.registerMessageListener(new MessageListenerConcurrently() {
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    				for (MessageExt msg : msgs) {
    					System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));
    				}
    				try {
    					int i = 1 / 0;
    				} catch (Exception e) {
    					e.printStackTrace();
                                    // 需要重试
    					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    
    				}
                             // 不需要重试
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});
    		consumer.start();
    		System.out.println("Consumer Started.");
    	}
    }
    
    

    注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。
    解决办法:使用自定义全局ID判断幂等,例如流水ID、订单号
    使用msg.setKeys 进行区分

    public class Producer {
    
    	public static void main(String[] args) throws MQClientException {
    		DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    		producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
    		producer.setInstanceName("producer");
    		producer.start();
    		try {
    			for (int i = 0; i < 1; i++) {
    				Thread.sleep(1000); // 每秒发送一次MQ
    				Message msg = new Message("link-topic", // topic 主题名称
    						"TagA", // tag 临时值
    						("link-6" + i).getBytes()// body 内容
    				);
    				msg.setKeys(System.currentTimeMillis() + "");
    				SendResult sendResult = producer.send(msg);
    				System.out.println(sendResult.toString());
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		producer.shutdown();
    	}
    
    }
    消费者:
    	static private Map<String, String> logMap = new HashMap<>();
    
    	public static void main(String[] args) throws MQClientException {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    
    		consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
    		consumer.setInstanceName("consumer");
    		consumer.subscribe("link-topic", "TagA");
    
    		consumer.registerMessageListener(new MessageListenerConcurrently() {
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    				String key = null;
    				String msgId = null;
    				try {
    					for (MessageExt msg : msgs) {
    						key = msg.getKeys();
    						if (logMap.containsKey(key)) {
    							// 无需继续重试。
    							System.out.println("key:"+key+",无需重试...");
    							return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    						}
    						msgId = msg.getMsgId();
    						System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
    						int i = 1 / 0;
    					}
    
    				} catch (Exception e) {
    					e.printStackTrace();
    					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    				} finally {
    					logMap.put(key, msgId);
    				}
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});
    		consumer.start();
    		System.out.println("Consumer Started.");
    	}
    
    

    使用队列缺点

    • 消息队列挂了:必须要做集群处理,高可用,增加系统复杂度
    • 重复消费消息:幂等性处理
    • 消息的顺序消费:
    • 分布式事务问题:尽量不走队列,否则要保证最终执行成功
    • 消息堆积的问题 :

    1 顺序消费

    分为普通顺序 和 严格顺序 :

    • 所谓普通顺序是指 消费者通过 同一个消费队列收到的消息是有顺序的 ,不同消息队列收到的消息则可能是无顺序的。普通顺序消息在 Broker 重启情况下不会保证消息顺序性 (短暂时间) 。
    • 所谓严格顺序是指 消费者收到的 所有消息 均是有顺序的。严格顺序消息 即使在异常情况下也会保证消息的顺序性 。
      但是,严格顺序看起来虽好,实现它可会付出巨大的代价。如果你使用严格顺序模式,Broker 集群中只要有一台机器不可用,则整个集群都不可用。你还用啥?现在主要场景也就在 binlog 同步。

    解决:

    我们需要处理的仅仅是将同一语义下的消息放入同一个队列(比如这里是同一个订单),那我们就可以使用 Hash取模法 来保证同一个订单在同一个队列中就行了。

    2 重复消费

    可以使用 写入 Redis 来保证,因为 Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。

    3 分布式事务

    在 RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的。

    在这里插入图片描述

    4 消息堆积问题

    在上面我们提到了消息队列一个很重要的功能——削峰 。那么如果这个峰值太大了导致消息堆积在队列中怎么办呢?
    其实这个问题可以将它广义化,因为产生消息堆积的根源其实就只有两个——生产者生产太快或者消费者消费太慢。

    5 回溯消费
    回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,在RocketMQ 中, Broker 在向Consumer 投递成功消息后,消息仍然需要保留 。

  • 相关阅读:
    查询mysql哪些表正在被锁状态
    查询mysql哪些表正在被锁状态
    查询mysql哪些表正在被锁状态
    JS模块化工具requirejs教程(二):基本知识
    JS模块化工具requirejs教程(二):基本知识
    JS模块化工具requirejs教程(一):初识requirejs
    sublime的安装与使用
    清除浏览器默认样式——css reset & normalize.css
    webstorm常见快捷方法与遇到的一些问题
    【Git】commit成功后,GitHub贡献图方格没变绿
  • 原文地址:https://www.cnblogs.com/cndeveloper/p/14347617.html
Copyright © 2011-2022 走看看