zoukankan      html  css  js  c++  java
  • Java操作RockeMQ

    RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。

            本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

            RocketMQ的Maven依赖:

    <!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.2.6</version>
    </dependency>

    MQ的消费类RocketMQConsumer.java:

    package com.lance.rocketMQ.RocketMQ;
     
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
     
    import java.util.UUID;
     
     
    /**
     * Created by lance on 2017/2/10.
     */
    public class RocketMQConsumer {
     
        private DefaultMQPushConsumer consumer;
     
        private MessageListener listener;
     
        protected String nameServer;
     
        protected String groupName;
     
        protected String topics;
     
        public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
            this.listener = listener;
            this.nameServer = nameServer;
            this.groupName = groupName;
            this.topics = topics;
        }
     
        public void init() {
            consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr(nameServer);
            try {
                consumer.subscribe(topics, "*");
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            consumer.setInstanceName(UUID.randomUUID().toString());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) this.listener);
     
            try {
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName()
            );
        }
     
     
    }

    MQ消息的监听接口类RocketMQListener.java

    package com.lance.rocketMQ.RocketMQ;
     
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.message.MessageExt;
     
    import java.util.List;
     
    /**
     * Created by lance on 2017/2/10.
     */
    public class RocketMQListener  implements MessageListenerConcurrently {
     
     
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    //        System.out.println("get data from rocketMQ:" + msgs);
            for (MessageExt message : msgs) {
     
                String msg = new String(message.getBody());
                System.out.println("msg data from rocketMQ:" + msg);
            }
     
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    MQ消息的生产者类RocketMQProducer.java

    package com.lance.rocketMQ.RocketMQ;
     
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.client.producer.SendStatus;
    import com.alibaba.rocketmq.common.message.Message;
     
    import java.util.UUID;
     
    /**
     * Created by lance on 2017/2/10.
     */
    public class RocketMQProducer {
     
        private DefaultMQProducer sender;
     
        protected String nameServer;
     
        protected String groupName;
     
        protected String topics;
     
        public void init() {
            sender = new DefaultMQProducer(groupName);
            sender.setNamesrvAddr(nameServer);
            sender.setInstanceName(UUID.randomUUID().toString());
            try {
                sender.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
     
        public RocketMQProducer(String nameServer, String groupName, String topics) {
            this.nameServer = nameServer;
            this.groupName = groupName;
            this.topics = topics;
        }
     
        public void send(Message message) {
     
            message.setTopic(topics);
     
            try {
                SendResult result = sender.send(message);
                SendStatus status = result.getSendStatus();
                System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

     测试RocketMQ的消费 RocketMQConsumerTest.java

    package com.lance.rocketMQ.RocketMQ;
     
    /**
     * Created by lance on 2017/2/10.
     */
    public class RocketMQConsumerTest {
     
     
        public static void main(String[] args) {
     
     
            String mqNameServer = "172.10.254.2:9876";
            String mqTopics = "MQ-MSG-TOPICS-TEST";
     
            String consumerMqGroupName = "CONSUMER-MQ-GROUP";
            RocketMQListener mqListener = new RocketMQListener();
            RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics);
            mqConsumer.init();
     
     
            try {
                Thread.sleep(1000 * 60L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
     
        }
    }

    测试RocketMQ的消费 RocketMQConsumerTest.java

    package com.lance.rocketMQ.RocketMQ;
     
    /**
     * Created by lance on 2017/2/10.
     */
    public class RocketMQConsumerTest {
     
     
        public static void main(String[] args) {
     
     
            String mqNameServer = "172.10.254.2:9876";
            String mqTopics = "MQ-MSG-TOPICS-TEST";
     
            String consumerMqGroupName = "CONSUMER-MQ-GROUP";
            RocketMQListener mqListener = new RocketMQListener();
            RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics);
            mqConsumer.init();
     
     
            try {
                Thread.sleep(1000 * 60L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
     
        }
    }
              run RocketMQConsumerTest.java 之后,控制台输出:

           

    RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb
                结果分析: 此时MQ对应的TOPIC中并没有响应的消息,故收不到消息,仅看到MQ消费者正常启动信息。

           

            MQ的生产者测试类:RocketMQProducerTest.java

    package com.lance.rocketMQ.RocketMQ;
     
    import com.alibaba.rocketmq.common.message.Message;
     
    /**
     * Created by lance on 2017/2/10.
     */
    public class RocketMQProducerTest {
     
        public static void main(String[] args) {
     
            String mqNameServer = "172.10.254.2:9876";
            String mqTopics = "MQ-MSG-TOPICS-TEST";
     
            String producerMqGroupName = "PRODUCER-MQ-GROUP";
            RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics);
            mqProducer.init();
     
     
            for (int i = 0; i < 5; i++) {
     
                Message message = new Message();
                message.setBody(("I send message to RocketMQ " + i).getBytes());
                mqProducer.send(message);
            }
     
     
     
        }
     
    }

      run RocketMQProducerTest.java 之后,RocketMQProducerTest.java 对应的控制台输出为:

                    

    1. messageId=0A71290100002A9F00000003D0BB0832, status=SEND_OK
    2. messageId=0A71290100002A9F00000003D0BB08BB, status=SEND_OK
    3. messageId=0A71290100002A9F00000003D0BB0944, status=SEND_OK
    4. messageId=0A71290100002A9F00000003D0BB09CD, status=SEND_OK
    5. messageId=0A71290300002A9F000000005440AEED, status=SEND_OK
             结果分析:表明所有消息都已经正常发送,且被RocketMQ正常接收。

              此时查看RocketMQConsumerTest.java对应的控制台输出发生改变,输出内容变更如下:

             

    1. RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb
    2. msg data from rocketMQ:I send message to RocketMQ 1
    3. msg data from rocketMQ:I send message to RocketMQ 0
    4. msg data from rocketMQ:I send message to RocketMQ 3
    5. msg data from rocketMQ:I send message to RocketMQ 2
    6. msg data from rocketMQ:I send message to RocketMQ 4

    看,简单吧!


    备注:小编自己使用了Apache版本的RocketMQ(即RocketMQ 4.*),发现只需要更改import的package的路径而已,不需要修改其他代码,请参考。


    RocketMQ的重复问题解决方式:
    a.MQ的消费端执行的操作具有幂等性,即无论多少次重复执行,其结果是一样的;
    b.MQ的消费端做重复校验,比如将受到MQ消息的唯一编号保存到Redis中,即每次收到消息时,将检查唯一编号是否已经在Redis中,如果存在说明消息重复;否则将唯一编号放入到Redis中,可以根据系统需要设置唯一编号在Redis中的过期时间,以防止Redis溢出。
  • 相关阅读:
    像素画
    随机世界生成2
    随机世界的生成
    unity2018使用tileMap生成地图 类似泰拉瑞亚创建和销毁地图块
    游戏反编译工具dnSpy
    unity物理学材质Physic Material
    bzoj3261: 最大异或和
    bzoj3524: [Poi2014]Couriers
    hdu2457:DNA repair
    poj2778:DNA Sequence
  • 原文地址:https://www.cnblogs.com/coder306/p/13087645.html
Copyright © 2011-2022 走看看