zoukankan      html  css  js  c++  java
  • RocketMQ快速入门

    前面几篇文章介绍了为什么选择RocketMQ,以及与kafka的一些对比: 阿里 RocketMQ 优势对比,方便大家对于RocketMQ有一个简单的整体了解,之后介绍了:MQ 应用场景,让我们知道MQ在什么时候可以使用,可以解决什么问题,之后介绍了:RocketMQ集群部署配置;本篇文章接着上篇内容之后,来给大家介绍下RocketMQ快速入门。

    如何使用

    1、引入 rocketmq-client

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.1.0-incubating</version>
    </dependency>
    

    2、编写Producer

     DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
           //指定NameServer地址
            producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的
    
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();
    
            for (int i = 0; i < 997892; i++) {
                try {
                    //构建消息
                    Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                    );
    
                    //发送同步消息
                    SendResult sendResult = producer.send(msg);
    
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
    
    producer.shutdown();
    

    3、编写Consumer

    /**
     * Consumer Group,非常重要的概念,后续会慢慢补充
     */
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
    //指定NameServer地址,多个地址以 ; 隔开
    consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的
    
    /**
     * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
     * 如果非第一次启动,那么按照上次消费的位置继续消费
     */
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
    consumer.subscribe("TopicTest", "*");
    
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    	ConsumeConcurrentlyContext context) {
    	try {
    	    for(MessageExt msg:msgs){
    		String msgbody = new String(msg.getBody(), "utf-8");
    		System.out.println("  MessageBody: "+ msgbody);//输出消息内容
    	    }
    	} catch (Exception e) {
    	    e.printStackTrace();
    	    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
    	}
    	return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
        }
    });
    
    
    consumer.start();
    
    System.out.printf("Consumer Started.%n");
    

    4、说明

    各位根据自己的环境,修改NamesrvAddr的值,我的集群请参考:RocketMQ集群部署配置。稍后通过RocketMQ管控台就可以看到之前搭建的多Master多Slave模式,异步复制集群模式。

    5、通过RocketMQ管控台

    rocketmq-console-ng获取方式为:rocketmq-console-ng,之后通过mavne进行编译获取jar,命令如下:

    mvn clean package -Dmaven.test.skip=true
    java -jar target/rocketmq-console-ng-1.0.0.jar
    

    得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jarBOOT-INFclassesapplication.properties文件,根据自己的NamesrvAddr进行修改rocketmq.config.namesrvAddr的值。

    直接启动:

    java -jar rocketmq-console-ng-1.0.0.jar
    

    管控台
    管控台是基于springboot的,的确springboot非常方便和非常火了,所以有必要去学习下springboot了(其实还是spring系列,所以spring也必要深入学习下),稍后通过管控台进行观察运行。

    6、运行观察

    一个好的习惯是先运行Consumer,之后在运行Producer,之后通过rocketmq-console-ng管控台观察

    运行中截图

    运行完成之后,的确broker-a的数据加上broker-b的数据量就等于我们发送的数据量,而且slave的数量也master的数量也是一致的,效果如下:

    运行完成

    查看发送这些数据,2台机器的磁盘情况如下:
    rocketmq1占用磁盘空间

    rocketmq2占用磁盘空间

    到目前位置,关于RocketMQ快速入门就结束了,未完待续……

    如果读完觉得有收获的话,欢迎点赞加关注。


    个人公众号,欢迎关注,查阅更多精彩历史!!!
    匠心零度公众号

  • 相关阅读:
    php分页类
    jquery Deferred对象
    sql 常见面试题
    C# socket编程第二篇
    Sqlserver本地服务启动不了,提示请求失败或服务未及时响应
    Sqlserver函数之log,power,len
    C#除法
    C# socket编程第一篇
    android菜鸟进修之路一layout里添加xml文件没有在R.java里生成ID
    form提交时应注意“&”符号
  • 原文地址:https://www.cnblogs.com/jiangxinlingdu/p/7892593.html
Copyright © 2011-2022 走看看