zoukankan      html  css  js  c++  java
  • RocketMq消息队列使用

    最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,

    目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

    比kafka还是有过之无不及,其实kafka文档很丰富

    但RocketMQ网上的文章太少,找不到相关的操作教程

    于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究

    下载源码的地址 https://github.com/alibaba/RocketMQ/releases

    • 首选通过在java项目里面Maven依赖方式引用RocketMQ Java SDK

      <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>3.2.6</version>
      </dependency>

    Downloads

    在linux 下用wget 下载源码然后解压出来

    在runserver.sh里面可以配置 jvm启动的参数 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  

    可以 vi runserver.sh

    分别给 mqnamesrv mqbroker play.sh 执行的权限

    chmod +x  mqnamersrv 

    chmod +x  mqbroker 

    chmod +x  play.sh 

    下面红线框的这段 命令输入错误了,忽略不用看

    通过 nohup sh mqnamesrv& 启动 RocketMq

    目前没看到结束的命令,也没找到相关的介绍,

    我这里用的 ps -ef|grep rocketmq  查到进程pid

    然后kill pid号

    或则pkill -9 java [慎用]

    用jps -v 查看下java进程的参数

     rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了

    在防火墙配置里面加上 9876端口,设置iptables对外开放

    部署Broker 

    nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties & 

    这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip

     Master和Slave的配置文件参考conf目录下的配置文件

     Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

     一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

     部署一Master一Slave,集群采用异步复制方式:

     Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &  

    Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &  

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    package com.pgsqlmybatis.common.rocketmq;/*
    ***************************************************************
    * 公司名称    :
    * 系统名称    :信用管家专业版
    * 类 名 称    :Ios渠道idfa统计,推广统计用
    * 功能描述    :
    * 业务描述    :
    * 作 者 名    :@Author Royal
    * 开发日期    :2016-05-15
    * Created     :IntelliJ IDEA
    ***************************************************************
    * 修改日期    :
    * 修 改 者    :
    * 修改内容    :
    ***************************************************************
    */
     
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
     
    public class Producer {
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("Producer");
            producer.setNamesrvAddr("xxxxxxxxxx:9876");
            try {
                producer.start();
     
                String pushMsg="kafka activeMq rocketMq 消息队列使用1";
                Message msg = new Message("PushTopic","push","1",
                        pushMsg.getBytes("UTF-8"));
     
                SendResult result = producer.send(msg);
                System.out.println("id:" + result.getMsgId() +
                        " result:" + result.getSendStatus());
     
                String pushMsg2="海量级消息记录单机测试2";
                msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8"));
     
                result = producer.send(msg);
                System.out.println("id:" + result.getMsgId() +
                        " result:" + result.getSendStatus());
     
                String pushMsg3="海量级消息记录单机测试3";
                msg = new Message("PullTopic","pull","1",pushMsg3.getBytes());
     
                result = producer.send(msg);
                System.out.println("id:" + result.getMsgId() +
                        " result:" + result.getSendStatus());
            catch (Exception e) {
                e.printStackTrace();
            finally {
                producer.shutdown();
            }
        }
    }

      

    启动生成者

    启动消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    package com.pgsqlmybatis.common.rocketmq;/*
    ***************************************************************
    * 公司名称    :
    * 系统名称    :信用管家专业版
    * 类 名 称    :Ios渠道idfa统计,推广统计用
    * 功能描述    :
    * 业务描述    :
    * 作 者 名    :@Author Royal
    * 开发日期    :2016-05-15
    * Created     :IntelliJ IDEA
    ***************************************************************
    * 修改日期    :
    * 修 改 者    :
    * 修改内容    :
    ***************************************************************
    */
     
    import java.io.UnsupportedEncodingException;
    import java.util.List;
     
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageExt;
     
    public class Consumer {
        public static void main(String[] args){
            DefaultMQPushConsumer consumer =
                    new DefaultMQPushConsumer("PushConsumer");
            consumer.setNamesrvAddr("xxxxxxxxxxxx:9876");
            try {
                consumer.subscribe("PushTopic""push");
                /**
                 * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
                 * 如果非第一次启动,那么按照上次消费的位置继续消费
                 */
                consumer.setConsumeFromWhere(
                        ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener(
                        new MessageListenerConcurrently() {
                            public ConsumeConcurrentlyStatus consumeMessage(
                                    List<MessageExt> list,
                                    ConsumeConcurrentlyContext Context) {
                                Message msg = list.get(0);
                                System.out.println(msg.toString());
                                String recString= null;
                                try {
                                    recString = new String(msg.getBody() ,"UTF-8");
                                catch (UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
                                System.out.println(recString);
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                        }
                );
                consumer.start();
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

       

    以上为单机实例配置

    如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^ 

    参考:http://blog.csdn.net/a19881029/article/details/34446629

            http://sofar.blog.51cto.com/353572/1540874

            http://blog.csdn.net/loongshawn/article/details/51086876

            RocketMq最佳实践

           《RocketMQ原理简介》

           分布式开放消息系统(RocketMQ)的原理与实践      

           《RocketMQ用户指南》

  • 相关阅读:
    Linux中逻辑卷的快照与还原
    Linux 中磁盘阵列RAID10损坏以及修复
    Linux 中磁盘阵列RAID10配置
    Linux 中磁盘容量配额
    Centos7VMware虚拟机最小化安装后,安装Tenda U12 USB无线网卡驱动
    安装vmware-tools遇the path "" is not valid path to the gcc binary和the path "" is not a valid path to the 3.10.0-327.e17.x86_64 kernel headers问题解决
    /etc/postfix下 main.cf 配置文件详解
    Linux安装配置vsftp搭建FTP的详细配置
    Linux中ftp的常用命令
    javascript深入理解js闭包
  • 原文地址:https://www.cnblogs.com/xumaojun/p/8523317.html
Copyright © 2011-2022 走看看