创建工程(Producer和Consumer)
导入依赖
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.0.10</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.0.10</version> <type>pom</type> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency>
创建生产者
package com.wn.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class MQProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i=0;i<10;i++){ Thread.sleep(1000); //每秒发送一次 Message msg = new Message("itmayiedu-topic", // topic 主题名称 "TagA", // tag 临时值 ("itmayiedu-"+i).getBytes()// body 内容 ); SendResult sendResult=producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
创建消费者
package com.wn.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class MQConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("itmayiedu-topic","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg:list){ System.out.println(msg.getMsgId()+"---"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started..."); } }
实现效果
执行producer和consumer
producer
consumer
列表中的信息如下: