zoukankan      html  css  js  c++  java
  • rocketmq入门笔记

    消息队列经典场景

    优点

    1. 异步
      • 原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的 下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去拉取消息队列中的东西进行处理.大大减少时间
    2. 解耦:增加积分,发送短信这些可以单独拆分出来,需要使用直接发送到知道的消息队列就行,你只需要关注你当前的业务
    3. 削峰: 如果使用线程池来解决,一个服务一个线程在高峰期你的mysql或者redis可能撑不住,使用mq就可以限制主机每次只拉取多少条进行处理

    缺点

    1. 可用性降低
      引入了mq,一旦mq宕机对业务有影响
    2. 复杂度提高
      数据链路变得复杂,如何保证顺序性,不重复消费
    3. 一致性问题
      用户支付了,增加积分出错该怎么处理

    整体架构

    1. nameserver 相当于注册中心,连接从这里取ip
    2. broker 消息仓库,里面有topic与队列
    3. product,consumer生产者消费者

    安装

    1. 基本的环境yum install java-1.8.0-openjdk-devel.x86_64 wget vim unzip -y
    2. 下载mq安装包wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
    3. 解压缩unzip rocketmq-all-4.7.1-bin-release.zip -d /usr/local/
    4. 启动nameserver服务
      1. vim bin/runserver.sh
      2. 默认堆初始化最大都是4g,新生代2g,测试机没这么内存,不修改无法启动,改为256m,新生代128mJAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      3. 后台启动nohup bin/mqnamesrv > n1.out &
    5. 启动broker服务
      1. vim bin/runbroker.sh
      2. 默认堆初始化最大都是8g,新生代4g,测试机没这么内存,不修改无法启动,改为512m,新生代256mJAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
      3. 暴露namserver地址echo 'export NAMESRV_ADDR=localhost:9876' >> ~/.bash_profile
      4. 后台启动nohup bin/runbroker.sh >n2.out &
    6. 日志验证
      • n1.out The Name Server boot success. serializeType=JSON
      • n2.out The broker[localhost.localdomain, 192.168.147.134:10911] boot success. serializeType=JSON and name server is localhost:9876
    7. 发送接收测试
      1. 发送bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
      2. 接收bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    8. 关闭
      • 关闭nameserver服务bin/mqshutdown namesrv
      • 关闭broker服务bin/mqshutdown broker

    集群

    1. 4种高可用集群
      • 多master模式
        • 优点:配置简单,性能最高
        • 缺点:单个宕机,这台机器上违背消费的消息不可订阅
      • 多master多salve 异步复制
        • 优点:消息丢失少(异步复制),消息实时性不受到影响,master宕机可以从slave上消费,性能与多master基本一致
        • 缺点:master宕机下会丢失少量消息
      • 多master多salve 同步双写
        • 优点:master宕机,消息无延迟,可用性高
        • 缺点:性能有所丢失
      • dledger模式:4.5版本之前采用master-slave架构部署但是master挂掉都slave无法自动晋升为master,这种模式可以将多个master-slave组成一个组,当组内master挂了将选举一个master继续服务

    集群搭建

    1. 修改vim conf/2m-2s-async/broker-a.properties配置文件
    #名字一样一个集群
    brokerClusterName=DefaultCluster
    #名字一样一个主从
    brokerName=broker-a
    # 0表示master >0标识slave
    brokerId=0
    # 删除文件时间
    deleteWhen=04
    # namesrv集群
    namesrvAddr=work1:9876;work2:9876
    # 默认创建队列数
    defaultTopicQueueNums=4
    # 自动创建队列
    autoCreateTopicEnable=true
    # 对外监听端口
    listenPort=10911
    #文件保留时间 默认48h
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #强制销毁文件间隔时间
    #destroyMapedFileIntervalForcibly=120000
    #重载文件时间
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间 
    diskMaxUsedSpaceRatio=88
    #存储路径 
    storePathRootDir=/usr/local/rocketmq-all-4.7.1-bin-release/store
    #commitLog 存储路径 
    storePathCommitLog=/usr/local/rocketmq-all-4.7.1-bin-release/store/commitlog
    #消费队列存储路径存储路径 
    storePathConsumeQueue=/usr/local/rocketmq-all-4.7.1-bin-release/store/consumequeue
    #消息索引存储路径 
    storePathIndex=/usr/local/rocketmq-all-4.7.1-bin-release/store/index
    #checkpoint 文件存储路径 
    storeCheckpoint=/usr/local/rocketmq-all-4.7.1-bin-release/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq-all-4.7.1-bin-release/store/abort
    #限制的消息大小 
    maxMessageSize=65536 
    #flushCommitLogLeastPages=4 
    #flushConsumeQueueLeastPages=2 
    #flushCommitLogThoroughInterval=10000 
    #flushConsumeQueueThoroughInterval=60000 
    #Broker 的角色 
    #- ASYNC_MASTER 异步复制Master 
    #- SYNC_MASTER 同步双写Master 
    #- SLAVE 
    brokerRole=ASYNC_MASTER 
    #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 
    flushDiskType=ASYNC_FLUSH 
    #checkTransactionMessageEnable=false 
    #发消息线程池数量 
    #sendMessageThreadPoolNums=128 
    #拉消息线程池数量 
    #pullMessageThreadPoolNums=128
    
    1. 将broker-a.properties写入到broker-b-s.properties修改brokerName,brokerId,brokerRole和几个文件存储路径,同一台虚拟机注意端口号也需要修改
    2. 克隆当前虚拟机,修改broker-a-s.properties,broker-b.properties文件
    3. 修改host文件vim /etc/hosts
    192.168.147.134 work1
    192.168.147.135 work2
    
    1. 启动两台nameservernohup bin/mqnamesrv >n1.out &
    2. 启动broker,使用-c指定配置文件nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties >nb.out &
    3. 关闭防火墙或者开放9876,两个broker服务的端口firewall-cmd --zone=public --add-port=9876/tcp --add-port=10911/tcp --add-port=11011/tcp --permanent``firewall-cmd --reload
    4. 四个broker服务都启动后验证集群bin/mqadmin clusterList -n work1:9876

    控制台搭建

    1. 项目地址rocketmq-dashboard
    2. 项目克隆git clone https://github.com/apache/rocketmq-dashboard.git
    3. 打开rocketmq-console导入idea,修改application.properties文件rocketmq.config.namesrvAddr=work1:9876;work2:9876以实际情况修改
    4. 打包项目上传jar包,启动nohup java -jar rocketmq-console-ng-2.0.0.jar &
    5. 打开浏览器访问当前服务器8080端口

    dledger集群搭建

    1. 快速演示
      1. bin/dledger/fast-try.sh快速演示的脚本,但脚本给一个broker的内存是1g,虚拟机没有这么大修改一下
      2. 这里我修改为256m
        function startNameserver() {
            export JAVA_OPT_EXT=" -Xms256m -Xmx256m  "
            nohup bin/mqnamesrv &
        }
        
        function startBroker() {
            export JAVA_OPT_EXT=" -Xms256m -Xmx256m  "
            conf_name=$1
            nohup bin/mqbroker -c $conf_name &
        }
        
      3. 启动bin/dledger/fast-try.sh start
      4. 查看集群情况bin/mqadmin clusterList -n 127.0.0.1:9876
      5. 查询master节点进程号并把它kill,看slave是否能转为master
    2. 实际搭建
      1. 配置文件增加一下几条
      #是否启动dledger
      enableDLegerCommitLog=true
      #组名与brokerName保持一致
      dLegerGroup=broker-a
      #当前组所有主机-专门监听端口号
      dLegerPeers=n0-192.168.147.134:40911;n1-192.168.147.135:49011;n2-192.168.147.134:40912
      #当前主机id
      dLegerSelfId=n0
      
      1. 集群搭建成功
      2. 直接把135主机关机了
      3. 切换成功

    基本概念

    消息模型

    producer生产消息,consumer消费消息,broker存储消息,每个broker对于一台服务器,每个broker可以存储多个opic消息,每个topic消息也可以分片存储于不同的broker上,message queue用于存储多个消息的物理地址,每个topic消息存储于多个message queue中

    生产者

    producer负责生产消费,将消费者消息发送到broker上,有多种发送方式:同步发送,异步发送,顺序发送,单向发送,同步与异步需要broker返回确认消息,单向发送不需要。同一类producer组成一个集合为生产组发送同一类消息且逻辑一致,如果有异常,broker服务器会联系同一生产者组提交或回滚

    消费者

    consumer消息者形式分为两种:

    1. 拉取式:主动式消费,消费者调用拉取的方法
    2. 推动式消费:broker有数据就会推给消费者
      消费者组必须订阅同一个topic,消息模式两种:
    3. 集群消费模式:平摊消费
    4. 广播消费模式:共享消费

    主题

    每个topic若干个消息,每个消息只能有一个主题,同一个topic下的数据分片保存到不同的broker,每个分片单位是messageQueue

    代理服务器

    • 几个模块
      • remoting module:处理来自clients的请求
      • client manager:负责管理客户端和维护消费者的topic订阅信息
      • store service:处理消息的存储查询功能
      • ha service:高可用服务,负责master与slave的数据同步
      • index service:索引服务,以提高查询
    • 普通集群
      • 每个节点固定角色,master负责响应客户端请求并存储消息,slave负责同步数据并响应客户端部分读请求
    • dledger高可用集群
      • dledger
        1. 接管broker的commitlog消息存储
        2. 选举leader节点
        3. 完成消息同步
      • 多副本消息同步
        leader收到消息会将消息标记为uncommited状态,发给follower,follower收到消息后需要给leader返回一个ack,如果有超过半数的follower返回ack就会把消息改为commited状态,发给follower
      • leader选举机制
        • 每个节点有三个状态,leader,follower,candidate(候选人)
        • 每个时间点叫做term
        • 集群启动时,每个节点都是follower,集群内部发送一个timeout信号,follower转为候选人,发起投票后收到半数以上的投票晋升为leader,
        • 选举过程,集群启动,三个节点都是follower,三个节点都给自己投票,term都是1,三个节点随机休眠,a启动term加一为2,第二个节点醒来,发现a的term比自己大,承认a是leader,c同理

    名字服务器

    充当路由消息的提供者,broker会在启动时向nameserver注册自己的服务信息,后续通过心跳维护当前服务的可用性,生产者或消费者通过名字服务查找各主题消息相应的broker ip列表

    消息

    每个消息都必须拥有一个topic,每个消息拥有唯一的message id,且可以携带业务标识key, 可以为消息设置一个tag标签

    消息存储

    消息存储
    • 时间
      • mq收到消息标记为uncommit状态发给follower,follower收到消息,发给leader一个ack,超过半数follower返回ack,消息改为commit状态,存储,状态同步给follower
      • mqpush消息给消费者,等待消费者ack响应,标记为已消费,没有标记消息会重复推送
      • mq会定期删除一些过期的消息
    • 存储介质:磁盘文件(采用顺序读写,保证存储的速度,采用mmap的方式,省去上下文切换,提高速度)
    消息存储结构
    1. commitlog:存储消息元数据,每个文件1个G
    2. consumerQueue:消息队列,保存commitlog的索引
    3. indexFile:提供通过key或时间来查询消息的方法
    刷盘机制
    1. 同步刷盘:消息写入机器的内存时,通知刷盘线程刷盘,等待刷盘线程写入完成后唤醒线程,返回写入完成
      • 优点:稳定安全
      • 缺点:性能不如异步
    2. 异步刷盘:消息写入内存后,返回写入完成,当内存累计到一定程度是统一触发刷盘操作
      • 优点:吞吐量大
      • 缺点:一旦服务器断电丢失部分消息
    主从复制
    1. 同步复制:生产者发送消息,只有master与slave(半数slave)写入成功才反馈生产者写入成功
    2. 异步复制:生产者发送消息,只要master写入消息成功,就反馈生产者写入成功,再异步将消息同步到slave
    负载均衡
    1. 生产者负载均衡:
      • 生产者发送消息时,获取当前topic下所有broker集合,采用取模递增算法将消息往不同的broker上发送
    2. 消费者负载均衡
      • 集群模式:六种分配算法
        1. AllocateMachineRoomNearby:同机房的消费者与broker分配一起
        2. AllocateMessageQueueAveragely:平均分配,将所有消息队列平均分配给消费者,先算数后分配
        3. AllocateMessageQueueAveragelyByCircle:先轮流给消费者分配一个队列,后面再增加
        4. AllocateMessageQueueByConfig:直接指定所有队列
        5. AllocateMessageQueueByMachineRoom:按逻辑机房进行分配
        6. AllocateMessageQueueConsistentHash:
      • 广播模式:每个消费者分配所有的队列
    消息重试

    广播模式下不存在消息重试,会直接消费下一条

    1. 如何重试
      消息监听器中配置
      1. 返回Action.ReconsumeLater
      2. 返回null
      3. 抛出异常
        不重试返回Action.CommitMessage
    2. 重试处理
      重试的消息会进入“%RETRY%”+ConsumeGroup队列,最多16次,16次后会进入死信队列,可配置例如20次,16次后酶促间隔2h
      16次每次间隔10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h
    3. messageId
      老版本中,无论重试多少次messageId是相同的,4.7.1中每次重试messageId会重建
    4. 配置覆盖
      最大重试次数对同一个消费组实例有效,最后启动的消费者会覆盖之前的配置
    死信队列
    1. 一个死信队列对于一个消费组,而不是一个消费者
    2. 一个消费组不需要死信队列是不会创建死信队列的
    3. 一个死信队列包含这个消费组所有无法消费的消息,不区分主题
    4. 消息无法再被消费者正常消费
    5. 默认存储3天,不管是否消费被删除
    6. 默认死信队列中的消息无法读取,需要将权限配置为6
    消息幂等

    当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。支付时重复提交了多次但最后还是只支付了一次的钱

    • 三种实现语义
      • at most once:每条消息最多消费一次
      • at least once:每条消息至少消费一次
      • exactly one:确定消费一次
        rocketmq支持at least once语义
    • 消息重复情况
      • 发送重复:消息发送到服务端并且持久化了,网络断开或者宕机了,生产者判断发送失败了会造次发送
      • 投递重复:消费者收到消息并完成业务处理了,准备发送消息接收时宕机了,服务端在恢复后会再次发送一遍这个消息
      • 负载均衡时消息重复:broker服务重启,扩容,缩容会触发rebalance造成消费者收到重复的消息
    • 解决:
      • 业务唯一标识:例如订单号
      • 利用数据库唯一索引或主键索引
      • 利用redis判断

    实操

    dledger模式不支持批量发送/升级v4.8+

    基础消息

    发送者
    package cn.jaminye.sample.producer;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    /**
     * @author Jamin
     * @date 2021/8/15 9:36
     */
    
    public class Producer {
    	public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    
    		DefaultMQProducer product = new DefaultMQProducer("product");
    		product.setNamesrvAddr("192.168.147.134:9876");
    		product.start();
    
    		/**
    		 * 同步发送
    		 */
    		Message message = new Message("java-topic", "hello-world".getBytes());
    		SendResult sendResult = product.send(message);
    		/**
    		 * 批量发送 topic必须相同
    		 */
    		/*Message message1 = new Message("batch-topic2", "hello-world1".getBytes());
    		Message message2 = new Message("batch-topic2", "hello-world2".getBytes());
    		Message message3 = new Message("batch-topic2", "hello-world3".getBytes());
    		List<Message> messages = new ArrayList<>(8);
    		messages.add(message1);
    		messages.add(message2);
    		messages.add(message3);
    		SendResult sendResult = product.send(messages);*/
    		/**
    		 *异步发送
    		 */
    		/*CountDownLatch countDownLatch = new CountDownLatch(1);
    		Message message = new Message("async-topic", "async-topic".getBytes());
    		product.send(message, new SendCallback() {
    			@Override
    			public void onSuccess(SendResult sendResult) {
    				countDownLatch.countDown();
    			}
    
    			@Override
    			public void onException(Throwable throwable) {
    				System.err.println(throwable.getMessage());
    			}
    		});
    		countDownLatch.await();*/
    		/**
    		 * 单向发送
    		 */
    		//主题,标签,key,内容
    		/*Message message = new Message("send-one-way", "tag1", "1", "send-one-way".getBytes());
    		product.sendOneway(message);
    		//这种方法无返回值,等待发送完成
    		Thread.sleep(5000);*/
    
    		// System.out.println("=================发送结果=============" + sendResult);
    		product.shutdown();
    	}
    }
    
    
    消费者
    package cn.jaminye.sample.consumer;
    
    /**
     * 拉
     *
     * @author Jamin
     * @date 2021/8/15 14:48
     */
    public class Consumer {
    	/**
    	 * 拉模式
    	 *
    	 * @param args
    	 * @author Jamin
    	 * @date 2021/8/16 10:13
    	 */
    	// public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    	// 	/**
    	// 	 * 拉模式,已弃用方式
    	// 	 */
    	// 	/*DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull-group");
    	// 	consumer.setNamesrvAddr("192.168.147.134:9876");
    	// 	consumer.start();
    	// 	MessageQueue messageQueue = new MessageQueue();
    	// 	messageQueue.setQueueId(2);
    	// 	messageQueue.setBrokerName("broker-a");
    	// 	messageQueue.setTopic("java-topic");
    	// 	PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, 0, 2);
    	// 	pullResult.getMsgFoundList().forEach(System.out::println);
    	// 	consumer.shutdown();*/
    	// 	/**
    	// 	 * 现用
    	// 	 */
    	// 	DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pull-group");
    	// 	consumer.setNamesrvAddr("192.168.147.134:9876");
    	// 	consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    	// 	consumer.subscribe("java-topic", "*");
    	// 	consumer.start();
    	// 	List<MessageExt> messageExtList = consumer.poll();
    	// 	messageExtList.forEach(System.out::println);
    	// 	consumer.shutdown();
    	// }
    
    	/**
    	 * 推模式
    	 *
    	 * @param args
    	 * @author Jamin
    	 * @date 2021/8/16 10:13
    	 */
    	/*public static void main(String[] args) throws MQClientException {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pull-group");
    		consumer.setNamesrvAddr("192.168.147.134:9876");
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		consumer.subscribe("java-topic", "*");
    		//负载
    		consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
    		consumer.registerMessageListener(new MessageListenerConcurrently() {
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    				System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});
    		consumer.start();
    
    	}*/
    
    
    }
    

    顺序消息

    生产者
    package cn.jaminye.order.product;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    /**
     * @author Jamin
     * @date 2021/8/16 10:24
     */
    public class Product {
    	public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
    		DefaultMQProducer producer = new DefaultMQProducer("order-group");
    		producer.setNamesrvAddr("192.168.147.134:9876");
    		producer.start();
    		String[] strings = {"下单", "付款", "生成订单"};
    		for (int i = 0; i < 100; i++) {
    			for (int j = 0; j < 3; j++) {
    				String s = "订单__" + i + "___" + strings[j];
    				Message message = new Message("order-topic", s.getBytes(RemotingHelper.DEFAULT_CHARSET));
    				//根据id取模入队列使分类消息进一个队列
    				producer.send(message, new MessageQueueSelector() {
    					@Override
    					public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    						int index = ((Integer) o) % list.size();
    						return list.get(index);
    					}
    				}, i);
    			}
    		}
    		producer.shutdown();
    	}
    }
    
    消费者
    package cn.jaminye.order.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author Jamin
     * @date 2021/8/16 10:36
     */
    public class Consumer {
    	public static void main(String[] args) throws Exception {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer");
    		//消费组订阅的消息未过期从头开始,已过期从当前开始
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		consumer.subscribe("order-topic", "*");
    		consumer.setNamesrvAddr("192.168.147.134:9876");
    		//顺序取
    		consumer.registerMessageListener(new MessageListenerOrderly() {
    			@Override
    			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
    				list.stream().map(messageExt -> new String(messageExt.getBody())).forEach(System.out::println);
    				return ConsumeOrderlyStatus.SUCCESS;
    			}
    		});
    		/*consumer.registerMessageListener(new MessageListenerConcurrently() {
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    				list.stream().map(messageExt -> new String(messageExt.getBody())).forEach(System.out::println);
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});*/
    		consumer.start();
    	}
    }
    

    广播消息

    consumer.setMessageModel(MessageModel.BROADCASTING);

    延迟消息

    //1-18 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    message1.setDelayTimeLevel(3);
    

    批量消息

    /**
     * 批量发送 topic必须相同  官方示例中小于1m 不能是延迟,事务消息
     */
    Message message1 = new Message("batch-topic2", "hello-world1".getBytes());
    Message message2 = new Message("batch-topic2", "hello-world2".getBytes());
    Message message3 = new Message("batch-topic2", "hello-world3".getBytes());
    List<Message> messages = new ArrayList<>(8);
    messages.add(message1);
    messages.add(message2);
    messages.add(message3);
    SendResult sendResult = product.send(messages);
    

    过滤消息

    1. 表达式过滤
      consumer.subscribe("filter-topic", "TAG1 || TAG2");
    2. sql过滤
    • 需要配置enablePropertyFilter=true
    • message1.putUserProperty("a", "1");
    • consumer.subscribe("filter-topic", MessageSelector.bySql("TAGS IN ('TAG1','TAG2') AND a between 0 and 1 "));
    • 基本语法>,<,>=,between,in,and,or,not

    事务消息

    • 代码
      //组名不能与其他组名相同
      TransactionMQProducer transactionGroupProducer = new TransactionMQProducer("transactionGroup");
      ExecutorService executorService = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
      transactionGroupProducer.setExecutorService(executorService);
      transactionGroupProducer.setNamesrvAddr("192.168.147.134:9876");
      TransactionListenerImpl transactionListener = new TransactionListenerImpl();
      transactionGroupProducer.setTransactionListener(transactionListener);
      transactionGroupProducer.start();
      for (int i = 0; i < 10; i++) {
      	Message message = new Message("transaction-topic", String.valueOf(i).getBytes());
      	message.putUserProperty("name", String.valueOf(i));
      	TransactionSendResult transactionSendResult = transactionGroupProducer.sendMessageInTransaction(message, null);
      	System.out.println(transactionSendResult.getSendStatus());
      }
      }
      
      public class TransactionListenerImpl implements TransactionListener {
      	@Override
      	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      
      		try {
      			//开启事务
      			//	do something
      			if ("1".equals(msg.getProperty("name"))) {
      				System.out.println("unknow");
      				return LocalTransactionState.UNKNOW;
      			}
      			System.out.println("success");
      			return LocalTransactionState.COMMIT_MESSAGE;
      		} catch (Exception ex) {
      			System.out.println("回滚事务");
      			return LocalTransactionState.ROLLBACK_MESSAGE;
      		}
      
      
      	}
      
      	@Override
      	public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      		System.out.println("进入check");
      		//	do something query db
      		return true ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
      
      	}
      }
      
    • 流程
      1. 发送消息到服务端,这个消息暂存在服务端,不会被消费者读取到
      2. 持久化成功后会返回生产者一个ack,确认消息是否成功
      3. 成功回调执行executeLocalTransaction方法,执行本地事务,持久化到数据库类的操作,这块的回滚自行处理,最终返回本地事务的执行结果
      4. 根据返回结果进行操作,commit的话会将当前消息移动到实际的topic下,回滚就删除消息
      5. 如果本地事务返回unknown,服务端会定时调用checkLocalTransaction方法进行查询,最多15次
      6. 根据checkLocalTransaction方法进行执行回滚或者提交

    acl 权限控制

    1. 开启权限控制
    aclEnable=true
    
    1. 配置文件
    #全局白名单
    globalWhiteRemoteAddresses:
    #- 192.168.147.*
    
    accounts:
    - accessKey: RocketMQ
      secretKey: 12345678
     #白名单地址
      whiteRemoteAddress:
      admin: false
      defaultTopicPerm: DENY
      defaultGroupPerm: SUB
      #针对每个主题
      topicPerms:
      - topicA=DENY
      - topicB=PUB|SUB
      - topicC=SUB
      - java-topic=DENY
      groupPerms:
      # the group should convert to retry topic
      - groupA=DENY
      - groupB=PUB|SUB
      - groupC=SUB
      - product2=DENY
    - accessKey: rocketmq2
      secretKey: 12345678
      whiteRemoteAddress: 192.168.1.*
      # if it is admin, it could access all resources
      admin: true
    
    1. 代码
    DefaultMQProducer product = new DefaultMQProducer("product2", new AclClientRPCHook(new SessionCredentials("RocketMQ", "12345678")));
    product.setNamesrvAddr("192.168.147.134:9876");
    product.start();
    Message message = new Message("java-topic", "hello-world".getBytes());
    SendResult sendResult = product.send(message);
    System.out.println(sendResult);
    product.shutdown();
    

    springboot整合rocketmq

    依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.0</version>
    </dependency>
    

    配置

    rocketmq.name-server=192.168.147.134:9876
    rocketmq.consumer.group=springboot-group
    

    普通消息发送

    @Component
    public class SpringProducer {
    	@Resource
    	RocketMQTemplate mqTemplate;
    
    	public void sendMessage() {
    		Message<String> message = MessageBuilder.withPayload("12345").build();
    		//topic:tag
    		mqTemplate.syncSend("topic-1" + ":" + "TAG1", message, 100000);
    		mqTemplate.syncSend("topic-1" + ":" + "TAG2", message, 100000);
    	}
    
    }
    
    @Component
    // selectorType 过滤使用tag还是sql selectorExpression tag或者sql consumeMode顺序还是正常的 messageModel广播还是集群
    @RocketMQMessageListener(topic = "topic-1", consumerGroup = "springboot-group", selectorType = SelectorType.SQL92, selectorExpression = "TAGS='TAG1'", consumeMode = ConsumeMode.CONCURRENTLY,
    		messageModel = MessageModel.CLUSTERING)
    public class SpringConsumer implements RocketMQListener<String> {
    	@Override
    	public void onMessage(String s) {
    		System.out.println(s);
    	}
    }
    
    事务消息
    @Component
    public class Producer {
    	@Resource
    	RocketMQTemplate rocketMQTemplate;
    
    	public void sendMessage() {
    		Message<String> message1 =
    				MessageBuilder.withPayload("123").setHeader(RocketMQHeaders.TRANSACTION_ID, "1").setHeader(RocketMQHeaders.TOPIC, "123")
    						.setHeader(RocketMQHeaders.TAGS, "1231").setHeader("a", 1).build();
    		TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("springboot-producer1:TAG1", message1, null);
    		System.out.println(transactionSendResult);
    	}
    }
    
    @RocketMQTransactionListener
    public class Listener implements RocketMQLocalTransactionListener {
    	@Override
    	public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    		System.out.println("message===============" + message);
    		// 获取时添加前缀RocketMQHeaders.PREFIX
    		String tags = message.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS, String.class);
    		System.out.println("id==================" + tags);
    		System.out.println("UNKNOWN==================");
    		return RocketMQLocalTransactionState.UNKNOWN;
    	}
    
    	@Override
    	public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    		System.out.println("message===============" + message.getPayload());
    		return RocketMQLocalTransactionState.COMMIT;
    	}
    }
    
    总结
    1. 使用RocketMQTemplate进行发送消息,相关属性都以rocketmq_开头
    2. topic:tags

    源码阅读

    环境搭建

    1. 源码地址 源码地址 使用4.7.1版本源码
    2. 在项目根目录下创建conf文件夹,复制distribution下broker.conf,logback_broker.xml,logback_nameserv.xml三个文件到conf下
    3. 在本机添加环境变量ROCKETMQ_HOME指向项目根目录
    4. 启动nameser
    5. 修改conf目录下的broker.conf 添加namesrvAddr,storePathRootDir,storePathRootDir,storePathCommitLog,storePathConsumeQueue,storePathIndex,storeCheckpoint,abortFile等参数具体可参考上方配置
    6. 启动broker 配置启动参数-c broker.conf文件地址

    namesever

    1. 配置信息:创建nameseverconfig与nettyserverconfig
    2. 初始化,启动,监听9876端口,提供给客户端拉取路由信息
    3. 创建处理请求的线程与定时扫描的线程(10s扫描一次,判断最后最后更新时间+2分钟,超出会删除这个broker并关闭连接)

    broker

    1. 启动了很多组件
    2. 注册到nameserver,每30s(可以配置修改但最长为60s)发送一次心跳

    producer

    1. DefaultMQProducerImpl:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
    2. 判断组名是否符合规定
    3. 启动各种定时任务,缓存nameserver上所有的主题,与broker建立心跳
    4. 发送消息采用索引自增取模的方式进行

    文件存储

    1. org.apache.rocketmq.store.DefaultMessageStore#putMessage
    2. 使用零拷贝追加到commitlog,同步或异步刷盘,主从同步
    3. 定时任务:每10s启动启动一次,
    作者: JaminYe
    版权声明:本文原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
  • 相关阅读:
    MySQL——SELECT
    启动 MySQL
    Ethernet and ARP 及Wireshark实验
    ICMP 协议及Wireshark实验
    Wireshark实验——IP 协议
    关键路径
    用树结构存储的图博客(笑)
    拓扑排序
    云计算部署的未来趋势将从自动化转向为自主化
    苹果拥抱 Rust,正在将 C 代码移植到 Rust
  • 原文地址:https://www.cnblogs.com/JaminYe/p/15559170.html
Copyright © 2011-2022 走看看