本地部署:
window配置启动:
1、 添加环境变量
ROCKETMQ_HOME="D:
ocketmq"
NAMESRV_ADDR="localhost:9876"
2、启动名称服务器
binmqnamesrv.cmd
3、启动Broker
binmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
可视化控制台:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
mvn打包: mvn clean package -Dmaven.test.skip=true
启动:java -jar rocketmq-console-ng-2.0.0.jar
访问:localhost:8080
Spring整合RocketMQ
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
1、依赖
<!--rocketmq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency>
2、配置生产者和消费者
@Configuration public class RocketmqConfig { /** * 实例化消息生产者Producer * start()方法用于启动Producer实例 * shutdown()方法关闭Producer实例 */ @Bean(initMethod = "start", destroyMethod = "shutdown") public MQProducer mqProducer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("custom-rocketmq-producer"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 // producer.start(); return producer; } /** * 消费者配置 * * @return * @throws MQClientException */ @Bean public MQPushConsumer mqPushConsumer() throws MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("custom-rocketmq-consumer"); // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("custom-topic1", "*"); // 注册监听器来处理消息,MyConsumer:自定义消息监听器 consumer.registerMessageListener(new MyConsumer()); // 启动消费者实例 consumer.start(); return consumer; } }
3、生产消息示例
@Autowired private MQProducer mqProducer; @Test void producerTest() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { // 同步发送消息 Message syncMsg = new Message("custom-topic1", "sync_msg", "hello RocketMQ syncMsg to custom-topic1".getBytes()); mqProducer.send(syncMsg); // 异步发送消息 Message asyncMsg = new Message("custom-topic1", "async_msg", "hello RocketMQ asyncMsg to custom-topic1".getBytes()); mqProducer.send(asyncMsg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // do somethind } @Override public void onException(Throwable e) { // do somethind } }); // 发送延迟消息 Message delayMsg = new Message("custom-topic1", "delay_msg", ("Hello RocketMQ delayMsg to custom-topic1").getBytes()); // 设置延时等级5,这个消息将在1分钟之后收到(现在只支持固定的几个时间,详看delayTimeLevel) delayMsg.setDelayTimeLevel(5); // 发送消息 mqProducer.send(delayMsg); }
4、消费消息
/** * 自定义消息监听器 */ public class MyConsumer implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : messageExts) { System.out.println("Receive New Messages:" + new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
SpringBoot整合RocketMQ
https://github.com/apache/rocketmq-spring/wiki
要求:
JDK1.8及以上
Maven 3.0及以上
SpringBoot 2.0及以上
1、添加依赖(rocketmq-client 版本为4.8.0)
<!--add dependency in pom.xml--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
2、配置namesrv 和 生产组
## application.properties #支持配置多个nameserver地址,采用;分隔即可 rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group
3、发送消息
@Component public class TemplateProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void producer() { // 同步发送消息 rocketMQTemplate.convertAndSend("test", "Hello, World sync!"); // 异步发送消息 rocketMQTemplate.asyncSend("test", "Hello, World async!", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("async onSucess SendResult=%s %n", sendResult); } @Override public void onException(Throwable e) { System.out.printf("async onException Throwable=%s %n", e); } }); // 发送消息时指定TAG,通过destination参数指定,格式为topicName:tagName rocketMQTemplate.convertAndSend("test:tag1", "hello,msg of tag1"); //send spring message,Message参数包为: org.springframework.messaging.Message rocketMQTemplate.send("test", MessageBuilder.withPayload("Hello, World! I'm from spring message").build()); // 发送消息时如何设置消息的key,可以通过重载的xxxSend(String destination, Message<?> msg, ...)方法来发送消息,指定msg的headers来完成 String msgId = UUID.randomUUID().toString(); rocketMQTemplate.send("topic-test", MessageBuilder.withPayload("msg of key").setHeader(MessageConst.PROPERTY_KEYS, msgId).build()); // 发送顺序排序消息 rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload("Hello, World").build(), "hashkey"); // 同步发送延迟消息,延迟levle为9,即5分钟。 rocketMQTemplate.syncSend("delay-topic", MessageBuilder.withPayload("hello delay msg").build(), 1000, 9); // 销毁rocketMQTemplate,注意:一旦销毁,就不能再使用rocketMQTemplate发送消息 // 不需要手动执行此方法,rocketMQTemplate会在spring容器销毁时自动销毁 //rocketMQTemplate.destroy(); } }
4、消费消息
1)Push模式
/** * push模式消费消息 * RocketMQListener<String> 只获取消息payload * RocketMQListener<MessageExt> 获取rocketmq原生的MessageExt消息 */ @Component @RocketMQMessageListener(topic = "custom-topic1", consumerGroup = "my-consumer_delay-test-1") public class PushConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println(("received message: " + new String(messageExt.getBody()))); } }
2)Pull模式
从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费
①:application.properties 添加配置
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
②:代码中主动拉取消息示例
/** * pull模式消费消息 */ @Component public class PullConsumer { @Resource private RocketMQTemplate rocketMQTemplate; public void pullMessage() { //This is an example of pull consumer using rocketMQTemplate. List<String> messages = rocketMQTemplate.receive(String.class); System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages); } }
END.