zoukankan      html  css  js  c++  java
  • CentOS7 安装 RocketMQ 实践和小示例

    CentOS7 安装 RocketMQ 实践和小示例

    1、通过 SSH 工具(比如 XShell)连接到 CentOS7 服务器上;

    2、进入到 /usr/local 目录中:
      cd /usr/local

    3、下载二进制版的 rocketmq:
      wget http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

    4、将下载下来的 rocketmq-all-4.4.0-bin-release.zip 解压:
      unzip rocketmq-all-4.4.0-bin-release.zip
    得到
      rocketmq-all-4.4.0-bin-release

    5、将 rocketmq-all-4.4.0-bin-release 更名为 rocketmq-all-4.4.0:
      mv rocketmq-all-4.4.0-bin-release rocketmq-all-4.4.0

    6、进入到 rocketmq-all-4.4.0 目录中:
      cd rocketmq-all-4.4.0/bin

    7、打开【环境配置】文件 profile:
      vim /etc/profile
    在文件末尾加入如下配置:
      export ROCKET_MQ_HOME=/usr/local/rocketmq-all-4.4.0
      export NAMESRV_ADDR=localhost:9876
      export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$MAVEN_HOME/bin:$ROCKET_MQ_HOME/bin:$PATH
      保存并关闭 profile:
      按 ESC,输入 qw! 回车
      使 profile 立即生效:
        source /etc/profile

    8、我们先看下 rocketmq 当前的状态:
      ps aux | grep rocketmq

    9、在当前目录中创建客户端配置文件 broker.properties:
      sh mqbroker -m > broker.properties

    10、修改配置文件 broker.properties,将 brokerIP1 的值改为我们机器的“公网 IP”,这里是关键,并将本次生成此配置文件的时间数据删除:
      namesrvAddr=localhost:9876
      brokerIP1=公网IP
      brokerName=iZ2zehfhto8e2w4t58a282Z
      brokerClusterName=DefaultCluster
      brokerId=0
      autoCreateTopicEnable=true
      autoCreateSubscriptionGroup=true
      msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
      traceTopicEnable=false
      rejectTransactionMessage=false
      fetchNamesrvAddrByAddressServer=false
      transactionTimeOut=6000
      transactionCheckMax=15
      transactionCheckInterval=60000
      aclEnable=false
      storePathRootDir=/root/store
      storePathCommitLog=/root/store/commitlog
      flushIntervalCommitLog=500
      commitIntervalCommitLog=200
      flushCommitLogTimed=false
      deleteWhen=04
      fileReservedTime=72
      maxTransferBytesOnMessageInMemory=262144
      maxTransferCountOnMessageInMemory=32
      maxTransferBytesOnMessageInDisk=65536
      maxTransferCountOnMessageInDisk=8
      accessMessageInMemoryMaxRatio=40
      messageIndexEnable=true
      messageIndexSafe=false
      haMasterAddress=
      brokerRole=ASYNC_MASTER
      flushDiskType=ASYNC_FLUSH
      cleanFileForciblyEnable=true
      transientStorePoolEnable=false

    11、创建日志的存放目录(如果目录已存在则忽略):
      mkdir -p /data/log/rocketMQ

    12、启动服务端:
      nohup sh mqnamesrv > /data/log/rocketMQ/server.log &

    13、启动客户端;

      nohup sh mqbroker -n localhost:9876 -c broker.properties autoCreateTopicEnable=true > /data/log/rocketMQ/client.log &
      注意:这里的地址是 localhost:9876 而非公网 IP。

    14、看下 rocketmq 的状态:
      jps
      出现如下则表明服务端和客户端均启动成功:
      2992 Jps
      28282 BrokerStartup
      27484 NamesrvStartup
      也可以通过 ps aux | grep rocketmq 来查看。

    15、rocketmq 的配置类:
    package com.smbea.rocketMQ;

    /**
    * rocketMQ 配置
    * @author hapday
    * @date 2019年5月15日 @time 下午10:47:14
    * @since 0.0.1
    *
    */
    public class RocketMQConfig {

    public static final String IP = "10.11.12.13"; // 公网 IP 地址
    public static final int PORT = 9876; // 端口,默认端口 9876
    public static final String GROUP_NAME = "rocketMQ-group-hapday"; // 组名
    public static final String INSTANCE_NAME = "rocketMQ-instance-hapday"; // 实例名
    public static final String TOPIC_NAME = "rocketMQ-topic-hapday"; // 主题名
    public static final String TAG_NAME = "rocketMQ-tag-hapday"; // 标签名

    }

    16、数据生产者:
    package com.smbea.rocketMQ;

    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    import com.artup.common.utils.CommonUtils;

    import lombok.extern.slf4j.Slf4j;

    /**
    * 数据生产者
    * @author hapday
    * @date 2019年5月15日 @time 下午10:48:13
    * @since 0.0.1
    */
    @Slf4j
    public class RocketMQProducer {

    public static void main(String[] args) {
    DefaultMQProducer producer = new DefaultMQProducer(RocketMQConfig.GROUP_NAME);
    producer.setNamesrvAddr(RocketMQConfig.IP + ":" + RocketMQConfig.PORT);
    producer.setInstanceName(RocketMQConfig.INSTANCE_NAME);
    producer.setVipChannelEnabled(false); // 关闭 VIP 通道

    try {
    producer.start(); // 启动【生产者】
    } catch (MQClientException e) {
    log.error("", e);
    }

    Message message = new Message();
    message.setTopic(RocketMQConfig.TOPIC_NAME);
    message.setTags(RocketMQConfig.TAG_NAME);

    SendResult sendResult = null;
    String content = CommonUtils.getCurrentDateAndTime().concat(" 大家好!我是消息队列 RocketMQ - "); // 消息的内容

    for (int index = 0; index < 10; index++) {
    message.setBody( content.concat(String.valueOf(index)).getBytes() );

    try {
    sendResult = producer.send(message); // 发送消息
    } catch (MQClientException e) {
    log.error("", e);
    } catch (RemotingException e) {
    log.error("", e);
    } catch (MQBrokerException e) {
    log.error("", e);
    } catch (InterruptedException e) {
    log.error("", e);
    }

    log.debug("响应结果:{}", sendResult);
    }

    producer.shutdown(); // 关闭【生产者】
    }

    }

    17、数据消费者:
    package com.smbea.rocketMQ;

    import java.util.List;

    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;

    import lombok.extern.slf4j.Slf4j;

    /**
    * 数据消费者
    * @author hapday
    * @date 2019年5月15日 @time 下午10:48:32
    * @since 0.0.1
    *
    */
    @Slf4j
    public class RocketMQConsumer {

    public static void main(String[] args) {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMQConfig.GROUP_NAME);
    consumer.setNamesrvAddr(RocketMQConfig.IP + ":" + RocketMQConfig.PORT);
    consumer.setInstanceName(RocketMQConfig.INSTANCE_NAME);
    consumer.setVipChannelEnabled(false); // 关闭 VIP 通道

    try {
    consumer.subscribe(RocketMQConfig.TOPIC_NAME, RocketMQConfig.TAG_NAME); // 订阅
    } catch (MQClientException e) {
    log.error("", e);
    }

    consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtendList, ConsumeConcurrentlyContext context) {
    for (MessageExt messageExtend : messageExtendList) {
    log.debug("消息内容:{}", new String(messageExtend.getBody()));
    }
    // log.debug("context = {}", context);

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    });

    try {
    consumer.start(); // 启动【消费者】
    } catch (MQClientException e) {
    log.error("", e);
    }

    log.debug("【消费者】已启动。");
    }

    }

    18、关闭服务:
      18.1 关闭 broker 服务: sh mqshutdown broker
      18.2 关闭 namesrv: sh mqshutdown namesrv

  • 相关阅读:
    C++ Low level performance optimize
    简单find命令的实现
    数据结构学习之栈
    随机数的生成
    数据结构学习(一)
    C复习---动态内存分配
    (转)虚拟文件系统(VFS)浅析
    (转) 中断处理程序&中断服务例程
    Gradle系列教程之依赖管理
    Gradle系列教程之依赖管理
  • 原文地址:https://www.cnblogs.com/hapday/p/10873068.html
Copyright © 2011-2022 走看看