zoukankan      html  css  js  c++  java
  • RocketMQ初入门踩坑记

    本文主要是讲在Centos中安装RocketMQ并做简单的示例。如果你按照本文安装100%是可以成功的,如果按照阿里官方的说明,那只能呵呵了~

    安装

    官方地址为:https://rocketmq.apache.org/docs/quick-start/
    本人安装如下:

    //下载最新的rocketmq
    wget http://apache-mirror.8birdsvideo.com/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip
    
    //解压
    unzip rocketmq-all-4.4.0-bin-release.zip
    
    //切换到mq目录
    cd rocketmq-all-4.4.0-bin-release
    
    //name server 启动
    nohup ./bin/mqnamesrv -n 111.231.XX.XX:9876 &
    
    //-c conf/broker.conf autoCreateTopicEnable=true 参数需要带上,不然topic需要手动创建
    nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &
    

    配置,切换到mq的bin目录下

    cd rocketmq-all-4.4.0-bin-release/bin
    

    rocketmq默认最低内存为4g,机器内存不够用的话,找到runserver.sh和runbroker.sh编辑如下:

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    

    运行

    运行官方demo,发现如下错误:

    21:20:22.249 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
    org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:640)
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310)
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256)
    	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339)
    	at org.apache.rocketmq.example.simple.Producer.main(Producer.java:40)
    

    运行以下命令查看broker配置并写入远程ip地址:

    //查看broker配置
    sh ./bin/mqbroker -m
    
    //关闭broker
    sh bin/mqshutdown broker
    
    //将本机远程ip写入配置文件中
    echo 'brokerIP1=111.231.XX.XX' > conf/broker.properties 
    
    //重新启动broker
    nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &
    

    管理控制台安装

    Git地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

    git clone git@github.com:apache/rocketmq-externals.git
    cd  rocketmq-external/rocketmq-console/
    mvn clean package -Dmaven.test.skip=true
    
    

    打完包后,运行以下命令

    java -jar rocketmq-console-ng-1.0.1.jar --server.port=12181 --rocketmq.config.namesrvAddr=111.231.XX.XX:9876
    
    

    打开 http://localhost:12181访问控制台,像如下

    在Procuder这个页面查询时会出现如下异常:

    java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1  DESC: the producer group[] not exist
    For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
            at com.google.common.base.Throwables.propagate(Throwables.java:160)
            at org.apache.rocketmq.console.service.impl.ProducerServiceImpl.getProducerConnection(ProducerServiceImpl.java:38)
            at org.apache.rocketmq.console.controller.ProducerController.producerConnection(ProducerController.java:39)
    
    

    请把代码中producer.shutdown()这句注掉,生产环境中请加上。

     //producer.shutdown();
    

    代码示例(官方)

    生产者

    package org.apache.rocketmq.example.simple;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
    
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    
            producer.setNamesrvAddr("111.231.XX.XX:9876");
            producer.start();
    
            for (int i = 0; i < 10; i++)
                try {
                    {
                        Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                        SendResult sendResult = producer.send(msg);
                        System.out.printf("%s%n", sendResult);
                    }
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            //producer.shutdown();
        }
    }
    
    

    消费者

    package org.apache.rocketmq.example.simple;
    
    import java.util.List;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class PushConsumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
            consumer.subscribe("TopicTest", "*");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //wrong time format 2017_0422_221800
            //consumer.setConsumeTimestamp("20181109221800");
            consumer.setNamesrvAddr("111.231.XX.XX:9876");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    
    

    有更多的文章,请关注查看,更有面试宝典相送
    image

  • 相关阅读:
    3709: [PA2014]Bohater
    T89379 【qbxt】复读警告
    T89353 【BIO】RGB三角形
    T89359 扫雷
    P1325 雷达安装
    P2983 [USACO10FEB]购买巧克力
    DP----鬼畜的数字三角形
    提高组突破嘤
    数据结构题目大赏 (一堆题目没做)
    DAY 3
  • 原文地址:https://www.cnblogs.com/pingyun/p/11629616.html
Copyright © 2011-2022 走看看