zoukankan      html  css  js  c++  java
  • RocketMQ 介绍与基本使用

    介绍

    RocketMQ是阿里巴巴自研的第三代分布式消息中间件,是阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0 版本名称改为RocketMQ,是阿里参照kafka设计思想使用Java实现的一套MQ。同时将阿里系内部多款MQ产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下MQ的架构。

    2016年11月,阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。阿里称会将其打造成顶级项目。

    2017年2月20日,RocketMQ正式发布4.0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。

    相关地址

    小试牛刀

    可通过自己下载源码编译或下载编译好的文件,地址见上。
    假设是自己下载源码进行编译

    下载源码并进行编译

    > git clone https://github.com/apache/incubator-rocketmq.git
    > cd incubator-rocketmq
    > mvn clean package install -Prelease-all assembly:assembly -U
    > cd target/apache-rocketmq-all/
    

    Start Name Server

    > nohup sh bin/mqnamesrv &
    > tailf nohup.out
    

    Start Broker

    > nohup sh bin/mqbroker -n localhost:9876 &
    > tailf nohup.out
    

    注意如果这里启动失败,看一下内存是否足够,可以看一下“runbroker.sh”这个文件,对应的修改参数,如下

    JAVA_OPT="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
    

    测试发送与接收

     > export NAMESRV_ADDR=localhost:9876
     > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    
     > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    

    关闭服务

    > sh bin/mqshutdown broker
    > sh bin/mqshutdown namesrv
    

    在Java项目中的使用

    pom.xml

    <properties>
        <rocketmq_ver>4.0.0-incubating</rocketmq_ver>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>${rocketmq_ver}</version>
    </dependency>
    </dependencies>
    

    生产者

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class Producer {
    
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("Producer");
            producer.setNamesrvAddr(Config.ADDR);
            try {
                producer.start();
    
                Message msg = new Message("PushTopic", "push", "1", "Just for push1.".getBytes());
    
                SendResult result = producer.send(msg);
                System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
    
                msg = new Message("PushTopic", "push", "2", "Just for push2.".getBytes());
    
                result = producer.send(msg);
                System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
    
                msg = new Message("PushTopic", "pull", "1", "Just for pull.".getBytes());
    
                result = producer.send(msg);
                System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.shutdown();
            }
        }
    }
    

    消费者

    import java.util.List;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class Consumer {
        public static void main(String[] args) {
            DefaultMQPushConsumer consumer =
                    new DefaultMQPushConsumer("PushConsumer");
            consumer.setNamesrvAddr(Config.ADDR);
            try {
                //订阅PushTopic下Tag为push的消息
                consumer.subscribe("PushTopic", "push");
               /**
                * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
                * 如果非第一次启动,那么按照上次消费的位置继续消费
                */
                consumer.setConsumeFromWhere(
                        ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener(
                        new MessageListenerConcurrently() {
                            public ConsumeConcurrentlyStatus consumeMessage(
                                    List<MessageExt> msgs,
                                    ConsumeConcurrentlyContext Context) {
                                for (Message msg : msgs) {
                                    System.out.println(new String(msg.getBody()) + ":" + msg.toString());
                                }
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                        }
                );
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    结果

    id:C0A801663174723279CF77AF3C6E0000 result:SEND_OK
    id:C0A801663174723279CF77AF3C7B0001 result:SEND_OK
    id:C0A801663174723279CF77AF3C7D0002 result:SEND_OK
    
    Just for push1.:MessageExt [queueId=2, storeSize=184, queueOffset=14, sysFlag=0, bornTimestamp=1490348772974, bornHost=/192.168.127.1:53238, storeTimestamp=1490348775615, storeHost=/192.168.127.128:10911, msgId=C0A87F8000002A9F000000000002EDE8, commitLogOffset=191976, bodyCRC=1396413800, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, KEYS=1, CONSUME_START_TIME=1490348782880, UNIQ_KEY=C0A801663174723279CF77AF3C6E0000, WAIT=true, TAGS=push}, body=15]]
    Just for push2.:MessageExt [queueId=3, storeSize=184, queueOffset=14, sysFlag=0, bornTimestamp=1490348772987, bornHost=/192.168.127.1:53238, storeTimestamp=1490348775620, storeHost=/192.168.127.128:10911, msgId=C0A87F8000002A9F000000000002EEA0, commitLogOffset=192160, bodyCRC=2014758571, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, KEYS=2, CONSUME_START_TIME=1490348782882, UNIQ_KEY=C0A801663174723279CF77AF3C7B0001, WAIT=true, TAGS=push}, body=15]]
    

    参考

  • 相关阅读:
    java里如何使用输入流和输出流实现读取本地文件里内容和写出到本地文件里
    Windows 命令行基础(博主推荐)
    Python2.7编程基础(博主推荐)
    27 个Jupyter Notebook的小提示与技巧
    java里如何实现循环打印出字符或字符数组里的内容
    [转]angularjs之ui-grid 使用详解
    [转]AngularJS 实现 Table的一些操作(示例大于实际)
    [转]js 回车转成TAB(利用tabindex)
    [转] Entity Framework添加记录时获取自增ID值
    [转]使用依赖关系注入在 ASP.NET Core 中编写干净代码
  • 原文地址:https://www.cnblogs.com/powercto/p/6718035.html
Copyright © 2011-2022 走看看