zoukankan      html  css  js  c++  java
  • RocketMQ部署

    1.解压后用maven编译

    unzip rocketmq-all-4.3.0-source-release.zip
    cd rocketmq-all-4.3.0/
    mvn -Prelease-all -DskipTests clean install -U
    cd distribution/target/apache-rocketmq
    cd rocketmq-all-4.3.0/distribution/target/apache-rocketmq

     配置mavne环境变量

    export MAVEN_HOME=/usr/local/maven3
    export PATH=${PATH}:${MAVEN_HOME}/bin

    2.修改内存

    vim bin/runbroker.sh
    vim bin/runserver.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

    3.启动名称服务器

    nohup sh bin/mqnamesrv -n localhost:9876 autoCreateTopicEnable=true&
    tail -f ~/logs/rocketmqlogs/namesrv.log

    4.启动经纪人

    nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true&
    tail -f ~/logs/rocketmqlogs/broker.log

    5.发送接收测试

    export NAMESRV_ADDR=localhost:9876
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK]
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    6.关闭服务器

    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv

    7.java测试

    <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
    </dependency>

    package rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.junit.Test; import java.util.List; public class RocketMQTest { @Test public void mqSendTest() throws InterruptedException, MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer_demo"); //指定NameServer地址 //producer.setNamesrvAddr("10.125.20.39:9876"); //修改为自己的 producer.setNamesrvAddr("192.168.121.130:9876"); //修改为自己的 /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); for (int i = 0; i < 997892; i++) { try { //构建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //发送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } @Test public void mqConsumer() throws MQClientException { /** * Consumer Group,非常重要的概念,后续会慢慢补充 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo"); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr("10.125.20.39:9876"); //修改为自己的 /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: " + msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }

    中文名:惠凡

    博客名:淹死的鱼o0

    转载时请说明出处:http://www.cnblogs.com/huifan/

  • 相关阅读:
    uva 10369 Arctic Network
    uvalive 5834 Genghis Khan The Conqueror
    uvalive 4848 Tour Belt
    uvalive 4960 Sensor Network
    codeforces 798c Mike And Gcd Problem
    codeforces 796c Bank Hacking
    codeforces 768c Jon Snow And His Favourite Number
    hdu 1114 Piggy-Bank
    poj 1276 Cash Machine
    bzoj 2423 最长公共子序列
  • 原文地址:https://www.cnblogs.com/huifan/p/9460731.html
Copyright © 2011-2022 走看看