zoukankan      html  css  js  c++  java
  • RocketMQ部署

    1.解压后用maven编译

    unzip rocketmq-all-4.3.0-source-release.zip
    cd rocketmq-all-4.3.0/
    mvn -Prelease-all -DskipTests clean install -U
    cd distribution/target/apache-rocketmq
    cd rocketmq-all-4.3.0/distribution/target/apache-rocketmq

     配置mavne环境变量

    export MAVEN_HOME=/usr/local/maven3
    export PATH=${PATH}:${MAVEN_HOME}/bin

    2.修改内存

    vim bin/runbroker.sh
    vim bin/runserver.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

    3.启动名称服务器

    nohup sh bin/mqnamesrv -n localhost:9876 autoCreateTopicEnable=true&
    tail -f ~/logs/rocketmqlogs/namesrv.log

    4.启动经纪人

    nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true&
    tail -f ~/logs/rocketmqlogs/broker.log

    5.发送接收测试

    export NAMESRV_ADDR=localhost:9876
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK]
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    6.关闭服务器

    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv

    7.java测试

    <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
    </dependency>

    package rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.junit.Test; import java.util.List; public class RocketMQTest { @Test public void mqSendTest() throws InterruptedException, MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer_demo"); //指定NameServer地址 //producer.setNamesrvAddr("10.125.20.39:9876"); //修改为自己的 producer.setNamesrvAddr("192.168.121.130:9876"); //修改为自己的 /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); for (int i = 0; i < 997892; i++) { try { //构建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //发送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } @Test public void mqConsumer() throws MQClientException { /** * Consumer Group,非常重要的概念,后续会慢慢补充 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo"); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr("10.125.20.39:9876"); //修改为自己的 /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: " + msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }

    中文名:惠凡

    博客名:淹死的鱼o0

    转载时请说明出处:http://www.cnblogs.com/huifan/

  • 相关阅读:
    mysql perl 抓取update语句
    $/ 改变换行符
    $/ 改变换行符
    java读取jpg图片旋转按比例缩放
    感应器
    lisp分支
    鸡肋的Drools
    postgre去重复记录
    拖拽到指定位置
    base64coder调用
  • 原文地址:https://www.cnblogs.com/huifan/p/9460731.html
Copyright © 2011-2022 走看看