生产者:
/** * 生产者 */ public class Provider { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException { //创建一个生产者 DefaultMQProducer producer=new DefaultMQProducer("rmq-group"); //设置NameServer地址 producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876"); //设置生产者实例名称 producer.setInstanceName("provider"); //启动生产者 producer.start(); //发送消息 for (int i = 1; i <=10 ; i++) { Thread.sleep(1000); //模拟网络延迟 //创建消息 topic代表主题名称 tags代表小分类 body代表消息体 Message message=new Message("weksoft_topic","TagA",("wdksoft-"+i).getBytes()); //发送消息 SendResult send = producer.send(message); System.out.println(send.toString()); } } }
消费者
/** * 消费者:监听消费 */ public class Consumer { public static void main(String[] args) throws MQClientException { //创建消费者 DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group"); //设置NameServer地址 consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876"); //设置实例名称 consumer.setInstanceName("consumer"); //订阅Topic consumer.subscribe("weksoft_topic","TagA"); //监听消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //获取消息 for(MessageExt ext:msgs){ //RocketMQ由于是集群环境,所以产生的消息ID可能会重复 System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody())); } //接受消息状态 1.消费成功 2.消费失败 队列还有 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); } }
生产者生产消息
消费者消费消息
控制台多了入队和出队的记录