啥子背景这些就不多说了,一句带过,由阿里开发,交由Apache孵化,是Apache的一个顶级项目 (反正就是很牛逼),抗过阿里的双11,底层Java开发,反正就是很牛逼,
Producer:消息生产者,负责消息的生产
Consumer:消息消费者
Push Consumer:服务端向消费者推送消息
Pull Consumer:消费者向服务端拉取消息
Consumer Group:一集合的消费者,通常消费指定一类的消息
NameServer:集群架构中的组织协调员
收集broker的工作情况
只起调度分配作用,不存储,不处理消息
Broker:就是队列的核心所在
负责接收消息,发送消息
默认10S向NameServer报告自身情况,超过两分钟未报告默认宕机
Topic:根据Topic区分不同类型的消息
docker安装
通过搜索我们就下载start最多的那个吧
-
下载镜像,我们下载star比较高的
-
docker pull foxiswho/rocketmq:server-4.3.2 docker pull foxiswho/rocketmq:broker-4.3.2
-
-
创建nameserver容器(对应挂载目录自己创建准备好)
-
docker create -p 9876:9876 --name rmqserver -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -e "JAVA_OPTS=-Duser.home=/opt" -v /ninja_data/rocketmq/nameserver/logs:/opt/logs -v /ninja_data/rocketmq/nameserver/store:/opt/store foxiswho/rocketmq:server-4.3.2
-
-
创建broker容器(对应挂载目录自己创建准备好)
-
docker create -p 10911:10911 -p 10909:10909 --name rmqbroker -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" -v /ninja_data/rocketmq/broker/broker.conf:/etc/rocketmq/broker.conf -v /ninja_data/rocketmq/broker/logs:/opt/logs -v /ninja_data/rocketmq/broker/store:/opt/store foxiswho/rocketmq:broker-4.3.2
-
-
启动两个容器
-
docker start rmqbroker rmqserver
-
-
分别查看一下对应的启动日志
-
docker logs rmqbroker
-
docker logs rmqserver
-
-
我们在安装一个UI管理工具
-
docker pull styletang/rocketmq-console-ng
-
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.17.0.3:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
-
(备注:)运行容器中的addr地址,是安排rmqserver服务docker的ip
-
docker exec -it rmqserver bash //进入rmqserver服务的docker
-
cat /etc/hosts //得到ip,将其填充到容器运行命令中即可,其实后面在UI页面也可以改。
-
-
-
到这里基本上就已经部署完成了,下面把broker的配置文件一起贴上
-
brokerIP1=192.168.0.150 namesrvAddr=192.168.0.150:9876 brokerName=ninja_broker_name
-
-
记得访问下服务器8080端口,会出现相应的管理页面且无报错
我感觉你在无中生有
呵!如果发现哪里不对劲来这里看就对了
当我发现,我用JavaApi访问的时候,抛出连接超时异常,???
package com.example.rocketmq.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; public class MyProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("ninja_connect"); //连接nameserver producer.setNamesrvAddr("192.168.0.150:9876"); //连接 producer.start(); //创建一个Topic producer.createTopic("ninja_broker_name", "ninja_topic_name", 8); for(int i = 0; i <= 10; i++){ Message message = new Message("ninja_topic_name","ninja_tags",("这是body部分" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult send = producer.send(message); System.out.printf("%s%n" , send); } System.out.println("创建topic成功"); //关闭连接 producer.shutdown(); } } //没有抛出异常可以去UI页面的消息队列查看相关消息
-
我第一时间想到我防火墙是否开放对应端口,索性一锅端,砸烂防火墙
-
然后我重启镜像,wtf ?还是不行,我brocker.conf难道没有生效?
-
进入容器内部,一探究竟,这一探就坏事了啊,我他妈发现映射路径完全对不上
-
我又心生一计,我去你妈的文件映射,端了,容器内部配置文件启动
-
我又发现在指定目录下的下下下下级目录下有一个长的有点像配置文件的玩意在涩涩发抖
-
虽然长的有点像,但除了他也别别的文件更像了,操作一盘,一梭子配置追加
-
回到bin目录 ,配置文件重启容器,???Address alread use,以我三年级的英语水平我读懂了这句话,难道我宿主机配置其实一直在默默发力,我收拾了一下有点崩的心态回到宿主机
-
cat一下配置,发现文件开头一个i在角落瑟瑟发抖,呵,我僵硬的笑了。
-
进入宿主机,停止容器,重启容器,行云流水 查看启动日志,一切正常
-
再次测试,看到UI页面的success,我心神领会的笑了
-
这一通操作让我发现一个问题,这容器内的文件文件映射路径都是错的,他怎么读取我宿主机上的配置文件,我怀揣着这个疑惑,缓缓睡去,少年心想,难道这是魔法?
JavaAPI之Message
我们可以看到:producer.send(new Message()),我们对Message做一个学习
消息的生产与消费
同步发送消息
package com.example.rocketmq.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; //同步发送消息 public class SyncProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("ninja_connect"); producer.setNamesrvAddr("192.168.0.150:9876"); producer.start(); String msgStr = "用户A发送消息给用户B"; Message msg = new Message("ninja_topic_name","SEND_MSG", msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息 SendResult sendResult = producer.send(msg); System.out.println("消息状态:" + sendResult.getSendStatus()); System.out.println("消息id:" + sendResult.getMsgId()); System.out.println("消息queue:" + sendResult.getMessageQueue()); System.out.println("消息offset:" + sendResult.getQueueOffset()); producer.shutdown(); } }
异步发送消息
package com.example.rocketmq.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; //异步发送消息 public class AsyncProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ninja_connect"); producer.setNamesrvAddr("192.168.0.150:9876"); // 发送失败的重试次数 producer.setRetryTimesWhenSendAsyncFailed(0); producer.start(); String msgStr = "用户A发送消息给用户B"; Message msg = new Message("ninja_topic_name","SEND_MSG", msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 异步发送消息 producer.send(msg, new SendCallback() { //如果发送成功执行的回调 @Override public void onSuccess(SendResult sendResult) { System.out.println("消息状态:" + sendResult.getSendStatus()); System.out.println("消息id:" + sendResult.getMsgId()); System.out.println("消息queue:" + sendResult.getMessageQueue()); System.out.println("消息offset:" + sendResult.getQueueOffset()); } //如果发送失败执行的回调 @Override public void onException(Throwable e) { System.out.println("发送失败!" + e); } }); System.out.println("发送成功!"); //producer.shutdown(); } } //注意:异步发送不能在下面执行关闭,因为是异步,同步代码优先执行,还没发送,酒吧连接通道关闭了
消息消费
package com.example.rocketmq.Consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
//消息的消费者
public class MyConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
consumer.setNamesrvAddr("192.168.0.150:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("ninja_topic_name", "*");
//订阅topic,完整匹配
//consumer.subscribe("ninja_topic_name", "SEND_MSG");
//订阅topic,或
//consumer.subscribe("ninja_topic_name", "SEND_MSG || SEND_MSG1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
消息过滤器
RocketMQ支持根据用户自定义属性进行过滤,过滤表达式类似于SQL的where,如:a> 5 AND b ='abc',默认没有开启,需要手动开启,在broker.conf中追加配置,开启过滤器
enablePropertyFilter=true
-
定一个消息生产者
package com.example.rocketmq.Filter;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
public class FilterProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("filter_group");
producer.setNamesrvAddr("192.168.0.150:9876");
producer.start();
String msgStr = "我是编号为007的张三";
Message msg = new Message("ninja_topic_name","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("age", "23");
msg.putUserProperty("sex", "男");
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
-
再定义消息消费者
package com.example.rocketmq.Filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class FilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_group");
consumer.setNamesrvAddr("192.168.0.150:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("ninja_topic_name", MessageSelector.bySql("age>=20 AND sex='男'"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Producer详解
顺序消费
consumer在消费消息时,需要按照生产者发送消息的顺序进行消费,如果消息顺序发生改变,带来的业务错误不不允许的,
场景:生产者推送一条订单相关的消息,这条消息中包括了订单创建、订单付款、订单状态...等多个需求,这些数据根据订单id绑定到一个订单上,我们将其推送到MQ,如果不加设置,MQ会将其数据打乱,算法分配到任意的队列之中,万一存放订单创建需求的队列还没消费订单创建,而另一个队列却在执行该订单的付款业务,这怕是不得行哦?
顺序消费可以使得一个信息指定发送到那个队列中,而那个队列执行的线程就会执行相关的代码,不会发生上面那种顺序错位的现象
-
创建消息生产者
package com.example.rocketmq.order;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
public class OrderProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("order_group");
producer.setNamesrvAddr("192.168.0.150:9876");
producer.start();
//场景:10个订单对应100条消息,每个订单有10条订单相关的消息,实现顺序消费
for (int i = 0; i < 100; i++) {
//模拟生成订单,我们只有10个订单: 0-9
int orderId = i % 10;
String msgStr = "order --> " + i + ",orderId为" + orderId;
Message message = new Message("ninja_topic_name", "ORDER_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
//这个函数中的arg,就是send方法中最后一个参数orderId
Integer id = (Integer) arg;
//用 orderId对topic下的队列数进行取模,队列数默认为4
int index = id % mqs.size(); //无非只有以下结果 0、1、2、3
return mqs.get(index); //拿到我们要存放数据的消息队列
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
}
-
创建消息消费者
package com.example.rocketmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_group");
consumer.setNamesrvAddr("192.168.0.150:9876");
consumer.subscribe("ninja_topic_name", "ORDER_MSG");
//传入的是MessageListenerOrderly对象
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
try {
String sendResult = new String(msg.getBody(), "UTF-8");
int queryId = msg.getQueueId();
System.out.println(Thread.currentThread().getName() + " 队列Id为 >>"+ queryId +" 接收到新消息 >> " + sendResult);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
-
给大家看看控制台,是否是顺序消费
-
可以发现一共有四个队列(默认),每个线程对应一个队列,,不会去插手别的队列
-
相同订单id消息会落到同一个队列中,一个消费者线程会顺序消费队列,实现顺序消费。
-
分布式事务消息
-
可以看看以前写过的一个列子,关于RabbitMQ实现分布式事务:RabbitMQ分布式事务
-
但我们今天学习的RocketMQ,还是单独抽离出来,以至于后面两者或者三者的对比,我们另开篇幅
-
在RokcetMQ中,解决分布式事务的原理
RocketMQ中有一个概念,叫做"半消息",即生产者已经将消息推送给MQ,但是在MQ没有再次收到生产者的二次确认下,是不会将消息泄露给消费者让其消费的,这个消息的状态被名为“暂不能投递”状态
如果某个暂不能投递的消息一直没有被二次确认(可能是生产者宕机,也可能是网络原因等),RocketMQ会主动向消息生产者询问该消息的最终状态,(可以解决网络原因导致的二次确定丢失)这个过程就是RocketMQ的消息回查机制
-
执行流程为
-
生产者向MQ推送消息
-
MQ收到消息,进行持久化后ACK回执给消息生产者
-
消息生产者收到ACK回执,执行本地事务逻辑
-
生产者本地事务执行完毕,根据事务的成与否进行二次确认,可以是回滚尅是提交,如果二次确认是回滚,MQ会将消息删除,如果是提交,MQ会修改消息的状态为可投递消息,然后被消息消费者消费
-
-
创建生产者
package com.example.rocketmq.transecation;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.io.UnsupportedEncodingException;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setNamesrvAddr("192.168.0.150:9876");
// 设置事务监听器
producer.setTransactionListener(new TransacationListennerImpl());
producer.start();
producer.createTopic("ninja_broker_name", "pay_topic", 4);
// 发送消息
Message message = new Message("pay_topic", "用户A给用户B转账1000元".getBytes("UTF-8"));
producer.sendMessageInTransaction(message, null);
Thread.sleep(999999);
producer.shutdown();
}
}
-
本地事务处理类,也是二次确认出发点
package com.example.rocketmq.transecation;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.HashMap;
import java.util.Map;
public class TransacationListennerImpl implements TransactionListener {
private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();
//执行具体的业务逻辑的钩子
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
try{
System.out.println("A账户减少1000元");
//测试事务的提交与回滚
int i = 1 / 0;
System.out.println("B账户增加1000元");
STATE_MAP.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
//二次提交为:确认提交
return LocalTransactionState.COMMIT_MESSAGE;
//二次提交为:测试状态回查,我们不提交
//return LocalTransactionState.UNKNOW;
}catch(Exception e){
e.printStackTrace();
}
STATE_MAP.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
//二次提交确认回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//消息回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String transactionId = messageExt.getTransactionId();
System.out.println("消息回查" + transactionId + ">>>" + STATE_MAP.get(transactionId));
//响应给MQ,该事务是否提交
return STATE_MAP.get(messageExt.getTransactionId());
}
}
-
创建消费者
package com.example.rocketmq.transecation;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("transaction_group");
consumer.setNamesrvAddr("192.168.0.150:9876");
consumer.subscribe("pay_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
-
可以发现
当我们的业务抛出异常,事务没有提交的时候,消费者是不会得到消息的
Consumer详解
-
说完消息生产者,下面我们来学习一下消费消费者相关的知识
-
MQ和消息消费者之间到底谁主动啊,到底是推送还是拉取啊?
Push模式和Pull模式
这里容易造成一种误解,就是真的到底是谁在主动?看这两个词答案显而易见,但却不是这么简单,在RocketMQ中,Push和Pull都是通过消费者轮询主动拉取的方式来消费消息的,至于原因嘛:因为MQ服务推送压力过大,他们之间的区别在于:
-
push:是服务端将轮询封装了注册了一个MessageListener 监听器,如果轮询查到有消息,则唤醒监听器的consumeMessage 来消费消息,给人一种看上是MQ推送过来的,但却是消费者自己暗地里拉的()
-
pull:取消息的过程需要用户自己写,首先根据Topic拿到要消费的消息队列的集合,遍历这个集合得到每一个队列,然后针对每个消息队列批量消费消息,一次取完后,记录该队列下一次要取的开始消息偏移量(offset),直到取完了,再换另一个消息队列。
-
都是轮询,多久刷新一次呢?这能不能数据的实时性呢?
在上面我们已经说到:消息消费者通过轮询的方式实现数据的流通,但是数据的实时性没有得到保障,RocketMQ通过长轮询的方式,解决了这个问题
-
长轮询
消息消费者再轮询请求的过程中,若是MQ中的数据并没有更新,那么就将这个连接阻塞挂起,直到MQ中有新的数据时或者超时,才响应给消息消费者,再关闭连接,当消费者消费了这条消息后再向MQ发起新的请求
消息模式
看看我们之前写的代码:
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("transaction_group");
参数是消费者分组名称,我们一直没有说他的作用何在,现在对其进行分析
-
消费者分组就是为了应对MQ消息分发而出现的,当有三个消费者的被分到同一个组时,MQ的消息到底该分给组内的哪一个消费者呢?
-
一共有两种分发模式: 集群模式(默认模式)、广播模式
集群模式
同一个组内的消费者只消费所订阅消息的一部分,整个消费者分组内的消费者合起来才是所订阅的Topic的完整数据,实现了负载均衡的目的
consumer.setMessageModel(MessageModel.CLUSTERING);
广播模式
同一个组内的消费者消费所订阅消息的所有数据,也就是一条消息多个消费
consumer.setMessageModel(MessageModel.BROADCASTING);
生产重复消息的问题
-
场景:
-
当我们的生产者因为网络或者什么bug导致一条消息呗重复生产了两次或者两次以上,可如果消费者不会造成啥危害还好说,如果是订单退款怎么办,那不是都要多退一份乃至多份?那第二天你可能就出名了,杀了祭天都是轻松的。
-
-
解决思路
-
消费端处理消息的业务逻辑包吃冥等性(只能消费端实现)
-
保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现 ,即 利用一张日志表来记录已经处理成功的消息ID,当再次处理时,查询该ID是都在日志表中,若在,不做处理即可(MQ可以实现,但是压力会变大,也可以消费端实现)
-
-
所以:还是消费端自己处理消息重复的问题
RockerMQ的存储
同步刷盘与异步刷盘
在消息生产者推送消息给MQ的时候,数据保存在MQ有两种方式
-
同步刷盘:
-
消息生产者推送消息给MQ,消息写入MQ内存中时,调用对应刷盘线程开始刷盘,MQ刷盘到本地磁盘,刷盘完成后唤醒主线程,响应推送成功回执(数据安全性高,低吞吐)
-
-
异步刷盘:
-
消息生产者推送消息给MQ,消息写入MQ内存中,响应推送成功回执,当内存中的消息量达到一定体积时,调用刷盘线程统一刷盘(高吞吐,数据安全性较低)
-
RocketMQ的重试策略
在消息的生产和消费中,都可能会出现一些错误(网络异常啥的),一旦出现这种错误,就会进行错误重试,这种重试耗时2分钟,分别是生产者端和消费者端的重试
Producer端重试
package com.example.rocketmq.retry;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RetryProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("retry_group");
producer.setNamesrvAddr("192.168.0.150:9876");
//消息发送失败时,重试3次
producer.setRetryTimesWhenSendFailed(3);
producer.start();
producer.createTopic("ninja_broker_name", "retry_topic", 4);
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("retry_topic", "SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,并且指定超时时间
SendResult sendResult = producer.send(msg, 1000);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
Consumer端重试
消费者端的重试分为两种情景:exception 、 timeout
exception 情景
消息消费的状态分为成功和失败,如果失败就会在1s 5s 10s 30s...重试到2h为止,这当然是消息消费端的事,当然我们不需要这么多次的重试,我们可以通过
package com.example.rocketmq.retry; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.List; public class ReteyConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry_group"); consumer.setNamesrvAddr("192.168.0.150:9876"); consumer.subscribe("retry_topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println(new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } System.out.println("收到消息->" + msgs); if(msgs.get(0).getReconsumeTimes() >= 3){ // 重试3次后,不再进行重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); } }
timeout情景
这种情景是MQ在响应数据给消息消费者的路上就被半路截胡,消息压根没到消息消费者那里,自然也就无法收到消息消费者拿到消息消费后的回执,MQ将这种情景定义为超时
SpringBoot整合RocketMQ
普通消息
-
pom依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rocketmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rocketmq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
定义消息生产者
package com.example.rocketmq.boot;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SpringProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMsg(String topic, String message){
rocketMQTemplate.convertAndSend(topic,message);
}
}
-
定义消息消费者
package com.example.rocketmq.boot;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "spring_topic",
consumerGroup = "spring_group",
selectorExpression = "*")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("接收到消息" + s);
}
}
-
定义启动类
package com.example.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqApplication.class, args);
}
}
-
编写测试用列
package com.example.rocketmq.boot;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SpringProducerTest {
@Autowired
private SpringProducer springProducer;
@Test
void sendMsg() {
springProducer.sendMsg("spring_topic","Hello World");
}
}
-
我们先启动SpringBoot,在启动测试类进行测试,可以看到控制台打印消息
事务消息
-
pom依赖:注意不要使用新的版本,API有改动,相关网上资料很少
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
-
本地事务监听
package com.example.rocketmq.boot.transacation;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import java.util.HashMap;
import java.util.Map;
@RocketMQTransactionListener(txProducerGroup="transactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
//假设这是一个日志表,我们可以基于这个表,通过事务ID查询事务的提交状态,回查时需要
private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new
HashMap<>();
//执行本地事务,并返回本地事务执行状态
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
try {
System.out.println("调用扣钱服务-1000");
Thread.sleep(600);
System.out.println("调用加钱服务+1000");
Thread.sleep(600);
STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
//二次确认为 提交
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
}
STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
//二次确认为 回滚
return RocketMQLocalTransactionState.ROLLBACK;
}
//消息回查,并返回本地事务执行状态
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId));
//消息回查。二次确认
return STATE_MAP.get(transId);
}
}
-
定义消息生产者
package com.example.rocketmq.boot.transacation;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class SpringTransacationProducer {
//和监听器上的注解给值:rocketMQTemplateBeanName = "transactionGroup")定义一致
private final String TX_GROUP = "transactionGroup";
private final String TX_TOPIC_NAME = "spring_tx_topic";
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMsg(String msg) {
Message message = MessageBuilder.withPayload(msg).build();
TransactionSendResult transactionGroup = this.rocketMQTemplate.sendMessageInTransaction(TX_GROUP, TX_TOPIC_NAME,message, null);
System.out.println("发送消息成功");
}
}
-
定义消息消费者
package com.example.rocketmq.boot.transacation;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "spring_tx_topic",
consumerGroup = "spring_tx_group",
selectorExpression = "*")
public class SpringTransacationConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("接收到消息 -> " + s);
}
}
-
配置文件application.yml
server:
port: 8080
rocketmq:
name-server: 192.168.0.150:9876
producer:
group: boot_group
-
编写测试用列
package com.example.rocketmq.boot.transacation;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class SpringTransacationProducerTest {
@Autowired
private SpringTransacationProducer transactionProducer;
@Test
void sendMsg() {
transactionProducer.sendMsg("Hello World");
}
}
-
先把SpringBoot项目启动后,在运行测试用列进行测试即可,至于回查啊那些和前面一样,自己写点小bug进行测试即可
>>>>>>>>>>>>>>>>>>>>>>>>>>补充分割线>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>