zoukankan      html  css  js  c++  java
  • RocketMQ部署

    1.解压后用maven编译

    unzip rocketmq-all-4.3.0-source-release.zip
    cd rocketmq-all-4.3.0/
    mvn -Prelease-all -DskipTests clean install -U
    cd distribution/target/apache-rocketmq
    cd rocketmq-all-4.3.0/distribution/target/apache-rocketmq

     配置mavne环境变量

    export MAVEN_HOME=/usr/local/maven3
    export PATH=${PATH}:${MAVEN_HOME}/bin

    2.修改内存

    vim bin/runbroker.sh
    vim bin/runserver.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

    3.启动名称服务器

    nohup sh bin/mqnamesrv -n localhost:9876 autoCreateTopicEnable=true&
    tail -f ~/logs/rocketmqlogs/namesrv.log

    4.启动经纪人

    nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true&
    tail -f ~/logs/rocketmqlogs/broker.log

    5.发送接收测试

    export NAMESRV_ADDR=localhost:9876
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK]
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    6.关闭服务器

    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv

    7.java测试

    <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
    </dependency>

    package rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.junit.Test; import java.util.List; public class RocketMQTest { @Test public void mqSendTest() throws InterruptedException, MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer_demo"); //指定NameServer地址 //producer.setNamesrvAddr("10.125.20.39:9876"); //修改为自己的 producer.setNamesrvAddr("192.168.121.130:9876"); //修改为自己的 /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); for (int i = 0; i < 997892; i++) { try { //构建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //发送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } @Test public void mqConsumer() throws MQClientException { /** * Consumer Group,非常重要的概念,后续会慢慢补充 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo"); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr("10.125.20.39:9876"); //修改为自己的 /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: " + msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }

    中文名:惠凡

    博客名:淹死的鱼o0

    转载时请说明出处:http://www.cnblogs.com/huifan/

  • 相关阅读:
    模拟赛总结
    2018.04.06学习总结
    2018.04.06学习总结
    Java实现 LeetCode 672 灯泡开关 Ⅱ(数学思路问题)
    Java实现 LeetCode 671 二叉树中第二小的节点(遍历树)
    Java实现 LeetCode 671 二叉树中第二小的节点(遍历树)
    Java实现 LeetCode 671 二叉树中第二小的节点(遍历树)
    Java实现 LeetCode 670 最大交换(暴力)
    Java实现 LeetCode 670 最大交换(暴力)
    Java实现 LeetCode 670 最大交换(暴力)
  • 原文地址:https://www.cnblogs.com/huifan/p/9460731.html
Copyright © 2011-2022 走看看