rocketmq的四个角色
producer,comsumer,broker,nameserver
rocketmq各个角色之间的关系
rocketmq使用前需要先新建topic,然后根据topic发送和接收消息
rocketmq集群方法
rocketmq使用demo
https://gitee.com/liran123/rocketmq_demo
同步,异步发送消息
public SendResult syncSend(Integer id) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo01Message.TOPIC, message);
}
public void asyncSend(Integer id, SendCallback callback) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 异步发送消息
rocketMQTemplate.asyncSend(Demo01Message.TOPIC, message, callback);
}
public void onewaySend(Integer id) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// oneway 发送消息
rocketMQTemplate.sendOneWay(Demo01Message.TOPIC, message);
}
接收消息
@Component
@RocketMQMessageListener(
topic = Demo03Message.TOPIC,
consumerGroup = "demo03-consumer-group-" + Demo03Message.TOPIC
)
public class Demo03Consumer implements RocketMQListener<Demo03Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo03Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
不同类型的消费者
DefaultMQPushConsumer处理流程
consumer启动和关闭流程
当consumer为pull模式,启动可以自己控制,关闭时需要保存offset,需要在异常处理阶段增加把offset写入磁盘的处理,记住了每次关闭的offset,才能保证消息准确性
push模式,
关闭时需要调用shutdown函数,释放资源,保存offset
生产者
默认使用DefaultMqProducer类
简单生产者
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.20.4:9876");
producer.start();
for (int i = 0; i < 10000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
简单消费者
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.20.4:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest77777", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
nameserver
- nameserver是整个消息队列的状态服务器,各个角色要定期向nameserver上报自己的状态信息
- nameserver与各个组件交流的流程
topic
通过命令行操作进行创建
创建topic的命令会被发送到对应的broker,然后执行具体的逻辑
消息队列核心机制
服务器要将文件内容从磁盘发送到客户端,需要经历两个步骤
tmp_buf是预先申请的内存
从磁盘复制数据到内核态内存,从内核态内存复制到用户内存 完成read操作
从用户态内存复制到网络驱动的内核态内存,最后从网络驱动的内核台内存复制到网卡中进行传输 完成write操作
通过mmap方式,可以省去向用户态内存复制,提高速度
在java7中的mappedbytebuffer实现
顺序消息
分为全局顺序消息和部分顺序消息
rocket默认情况下会新建8个读队列,8个写队列,消息顺序不保证一致
全局有序
局部有序
发送端通过MessageQueueSelector类来控制把消息发往哪个MessageQueue
消费者端通过MessageListenerOrderly解决/messageQueue消息被并发处理的问题
在MessageListenerOrderly实现中,通过为每个Queue加锁,消费每个消息前都要先获取锁, 保证同一时间,同一个Queue不会被重复消费,但不同的queue可以并发处理
消息重复问题
![](https://img2020.cnblogs.com/blog/924254/202102/924254-20210219165648328-396085952.png
消息优先级的问题
broker端进行消息过滤:减少流到consumer的消息
通过tag,key,sql表达式等方式过滤
一个应用最好使用一个topic,然后不同类型的消息子类型用Tag来标识,服务端基于tag进行过滤,并不需要读取消息体的内容,效率比较高
key一般用消息在业务层面的唯一标识码来表示,尽量使key唯一