一、步骤分析
- 消息发送者步骤分析
- 创建消息生产者producer,并指定生产者组名
- 指定NameServer地址
- 启动producer完成初始化
- 创建消息对象,指定主题Topic、Tag和消息体
- 发送消息
- 关闭生产者producer
- 消息消费者者步骤分析
- 创建消费者Consumer,指定消费者组名
- 指定NameServer地址
- 订阅主题Topic和Tag
- 设置回调函数,处理消息
- 启动消费者consumer
二、消息发送
- 同步消息
这种可靠性同步地发送方式使用比较广泛,比如重要的消息通知,短信通知。
// 创建消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("生产者组名");
// 指定NameServer地址
producer.setNamesrvAddr("localhost:8080");
// 启动Producer实例
producer.start();
// 创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Messge("Topic","Tag","Message body".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
// 关闭生产者producer
producer.shutdown();
- 异步消息
通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker响应。
// 发送异步消息
producer.send(msg, new SendCallback(){
onSuccess(SendResult sendResult){};
onException(){Throwable e};
});
- 单向消息
主要用在不特别关心发送结果的场景,比如日志发送。
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
三、消息接收
// 创建消费者Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组名");
// 指定NameServer地址
consumer.setNamesrvAddr("192.168.22.222:8080;192.168.22.223:8080");
// 订阅主题Topic和Tag
consumer.subscribe("Topic", "Tag");
// 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently(){
// msgs:接收消息内容
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者consumer
consumer.start();
- 负载均衡模式
多个消费者共同消费队列消息,每个消费者处理的消息不同。
// 负载均衡模式消费
consumer.setMessageModel(MessageModel.clustering);
- 广播模式
每个消费者消费的消息都是相同的
// 广播模式消费
consumer.setMessageModel(MessageModel.broadcasting);