1.准备
首先要在本地安装RocketMQ,安装教程参考 RocketMQ安装。
2.创建SpringBoot项目集成RocketMQ
首先创建SpringBoot项目
1)pom引入依赖如下
<dependencies>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
2)application.properties配置文件如下
server.port=8088
rocketmq.producer.groupName=ProducerGroup
rocketmq.producer.namesrvAddr=127.0.0.1:9876
rocketmq.producer.instanceName=ProducerGroup
rocketmq.producer.topic=topic2020
rocketmq.producer.tag=test
rocketmq.producer.maxMessageSize=131072
rocketmq.producer.sendMsgTimeout=10000
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
rocketmq.consumer.groupName=ConsumerGroup
rocketmq.consumer.topic=topic2020
rocketmq.consumer.tag=test
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
3)创建MessageProcessor消息处理接口
package com.springboot.message;
import org.apache.rocketmq.common.message.MessageExt;
public interface MessageProcessor {
boolean handle(MessageExt messageExt);
}
4)实现MessageProcessorImpl消息处理类
package com.springboot.message.impl;
import com.springboot.message.MessageProcessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;
/**
* Description:监听消息处理类
*/
@Service
public class MessageProcessorImpl implements MessageProcessor {
@Override
public boolean handle(MessageExt messageExt) {
// 收到的body(消息体),字节类型,需转为String
String result = new String(messageExt.getBody());
System.out.println("监听到了消息,消息为:"+ result);
return true;
}
}
5)创建MessageListen消息监听类
package com.springboot.message;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Description:监听类
*/
@Component
public class MessageListen implements MessageListenerConcurrently {
@Autowired
private MessageProcessor messageProcessor;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt ext = list.get(0);
boolean result = messageProcessor.handle(ext);
if (!result) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
6)创建消息生产者RocketMQProducer
package com.springboot.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description:生产者配置 */ @Configuration @Slf4j public class RocketMQProducer { @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String nameserAddr; @Value("${rocketmq.producer.instanceName}") private String instanceName; @Value("${rocketmq.producer.maxMessageSize}") private int maxMessageSize; @Value("${rocketmq.producer.sendMsgTimeout}") private int sendMsgTimeout; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(nameserAddr); producer.setInstanceName(instanceName); producer.setMaxMessageSize(maxMessageSize); producer.setSendMsgTimeout(sendMsgTimeout); producer.setVipChannelEnabled(false); log.info("================>生产者创建完成,ProducerGroupName{}<================", groupName); return producer; } }
(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中开启/销毁生成者)
7)创建消息消费者RocketMQConsumer
package com.springboot.consumer; import com.springboot.message.MessageListen; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description:消费者配置 */ @Configuration @Slf4j public class RocketMQConsumer { @Autowired private MessageListen messageListen; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setVipChannelEnabled(false); // 我们自己实现的监听类 consumer.registerMessageListener(messageListen); try { consumer.subscribe(topic,tag); log.info("================>消费者创建完成,ConsumerGroupName{}<================",groupName); log.info("============>消费者监听开始,groupName:{},topic:{}<============",groupName,topic); } catch (MQClientException e) { log.error("消费者启动失败"); e.printStackTrace(); } return consumer; } }
(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中开启/销毁消费者)
8)创建controller类,RocketMqController
package com.springboot.controller; import com.springboot.consumer.RocketMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * Description: */ @RestController public class RocketMqController { @Autowired @Qualifier("rocketMQProducer") RocketMQProducer rocketMQProducer; @GetMapping("/test") public void TestSend() { DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String body = "hi RocketMQ, now is " + sdf.format(new Date()) + "."; Message message = new Message("topic2020", "test", body.getBytes()); try { producer.send(message); } catch (Exception e) { e.printStackTrace(); } } }
3.测试
1)首先启动本地RocketMQ(启动参考)
2)启动主启动类,在浏览器输入 localhost:8088/test,可以看到消费者已经创建并开始监听,并且已经消费消息
3)可视化图看出,在浏览器输入 localhost:7777