zoukankan      html  css  js  c++  java
  • RocketMQ

    一、安装准备

    1)修改配置

    修改runserver文件:

    vi runserver.sh

    1597716798394

    修改为如下所示:

    1597716847927

    修改runbroker文件:

    vi runbroker.sh

    1597716879152

    修改为:

    1597716909819

    2)启动rocketmq

    启动nameserver

    # 前台启动
    ./mqnamesrv
    # 后台启动
    nohup ./mqnamesrv > /dev/null 2>&1 &
    

    启动broker

    方式一:通过“-n”指定nameserver

    # 前台启动
    ./mqbroker -n 192.168.0.112:9876 autoCreateTopicEnable=true
    # 后台启动:端口必须指定9876
    nohup ./mqbroker -n 192.168.0.112:9876 autoCreateTopicEnable=true > /dev/null 2>&1 & 
    

    方式二:通过“-c‘配置文件

    # vi /opt/apps/rocketmq-4.7.1/conf/broker.conf
    brockerIP1=192.168.0.112
    namesrvAddr=192.168.0.112:9876
    brockerName=zomicc
    # 前台启动
    ./mqbroker -c  /opt/apps/rocketmq-4.7.1/conf/broker.conf
    # 后台启动
    nohup ./mqbroker -c  /opt/apps/rocketmq-4.7.1/conf/broker.conf > /dev/null 2>&1 & 
    

    命令行测试

    # 编辑profile文件
    vi /etc/profile
    # 设置nameserver服务器
    export NAMESRV_ADDR=172.17.0.2:9876
    # 刷新
    source /etc/profile
    # 测试消息发送命令
    sh tools.sh org.apache.rocketmq.example.quickstart.Producer
    # 测试消息接收命令
    sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
    

    3)安装管理工具

    A:docker安装

    # 拉取镜像 
    docker pull styletang/rocketmq-console-ng:1.0.0 
    # 创建并启动容器 (后台启动可加上-di参数)
    单机或者集群都只需用指定一个地址:
    docker run --name rmq-console -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.112:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0
    指定多个地址也不会报错:
    docker run --name rmq-console -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.112:9876;192.168.0.112:9877 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0
    

    B:非docker安装

    从https://github.com/apache/rocketmq-externals下载rocketmq-externals,进入rocketmq-console。

    修改rocketmq-console/src/main/resources/application.properties中的rocketmq.config.namesrvAddr。

    1597834305301

    mvn clean package -Dmaven.test.skip=true
    java -jar target/rocketmq-console-ng-2.0.0.jar
    

    浏览器访问http://192.168.0.112:8082/,看到如下图证明管理工具设置成功。

    1597834730501

    4)集群搭建2m2s

    基于方便起见以及对docker的练习,直接采用docker安装。

    注意:配置文件在容器中的/etc/rocketmq/brocker.conf中。

    两个可使用便捷命令

    docker cp ./broker.conf rmqbroker01:/etc/rocketmq/broker.conf
    docker cp rmqbroker01:/etc/rocketmq/broker.conf  ./
    

    创建两个nameserver

    # nameserver1 
    docker create -p 9876:9876 --name rmqserver01  
    -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
    -e "JAVA_OPTS=-Duser.home=/opt"  
    foxiswho/rocketmq:server-4.5.1
    ====================================================
    # nameserver2 
    docker create -p 9877:9876 --name rmqserver02  
    -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
    -e "JAVA_OPTS=-Duser.home=/opt"  
    foxiswho/rocketmq:server-4.5.1 
    

    创建两个master brocker

    # master broker01 
    docker create --net host --name rmqbroker01  
    -e "JAVA_OPTS=-Duser.home=/opt"  
    -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
    foxiswho/rocketmq:broker-4.5.1 
    # 配置 
    namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
    brokerClusterName=testCluster 
    brokerName=broker01
    brokerId=0 #0是master非0是slave
    deleteWhen=04 #删除文件时间点,默认凌晨 4点
    fileReservedTime=48 #文件保留时间,默认48小时
    brokerRole=SYNC_MASTER #Broker 的角色 ASYNC_MASTER 异步复制Master SYNC_MASTER 同步双写Master 							从机SLAVE
    flushDiskType=ASYNC_FLUSH #刷盘模式 ASYNC_FLUSH异步,SYNC_FLUSH同步
    brokerIP1=192.168.0.112
    brokerIp2=192.168.0.112
    listenPort=10911
    ================================
    # master broker02 
    docker create --net host --name rmqbroker02  
    -e "JAVA_OPTS=-Duser.home=/opt"  
    -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
    foxiswho/rocketmq:broker-4.5.1 
    # 配置
    namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
    brokerClusterName=testCluster 
    brokerName=broker02
    brokerId=0
    deleteWhen=04
    fileReservedTime=48 
    brokerRole=SYNC_MASTER 
    flushDiskType=ASYNC_FLUSH 
    brokerIP1=192.168.0.112 
    brokerIp2=192.168.0.112 
    listenPort=10811 
    

    创建两个slave brocker

    # slave broker03 
    docker create --net host --name rmqbroker03  
    -e "JAVA_OPTS=-Duser.home=/opt"  
    -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
    foxiswho/rocketmq:broker-4.5.1 
    # 配置
    namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
    brokerClusterName=testCluster
    brokerName=broker01 
    brokerId=1 
    deleteWhen=04 
    fileReservedTime=48 
    brokerRole=SLAVE 
    flushDiskType=ASYNC_FLUSH 
    brokerIP1=192.168.0.112 
    brokerIp2=192.168.0.112 
    listenPort=10711
    =========================
    # slave broker04
    docker create --net host --name rmqbroker04  
    -e "JAVA_OPTS=-Duser.home=/opt"  
    -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m"  
    foxiswho/rocketmq:broker-4.5.1 
    # 配置 
    namesrvAddr=192.168.0.112:9876;192.168.0.112:9877
    brokerClusterName=testCluster 
    brokerName=broker02 
    brokerId=1 
    deleteWhen=04 
    fileReservedTime=48 
    brokerRole=SLAVE 
    flushDiskType=ASYNC_FLUSH 
    brokerIP1=192.168.0.112 
    brokerIp2=192.168.0.112 
    listenPort=10611 
    

    启动容器

    #启动容器 
    docker start rmqserver01 rmqserver02 
    docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04
    

    可能出现的错

    java.net.BindException: Address already in use
    可能原因1:端口相同了。
    可能原因2:4个broker在同一台机器上,listenPort间隔太近了。可适当调大一点。
    
    The Name Server Address[192.168.0.112:9876;192.168.0.112:9877 ] illegal, please set it as follows, "127.0.0.1:9876;192.168.0.1:9876"
    是因为多了一个非法的空格。注意删除一切无用的尾部空格。
    

    浏览器访问http://192.168.0.112:8082/,看到如下图才证明集群搭建成功。否则检查相关配置(尤其注意一下多余空格。比如一个集群名为“testCluster”,一个为“testCluster ”,这样也是有问题的)

    1597834543640

    systemctl firewalld.service stop
    

    二、集群架构

    网络架构图总览

    https://github.com/apache/rocketmq/tree/master/docs/cn

    1598087523796

    消息存储

    ConsumeQueue(offset)+CommitLog

    负载均衡

    从队列角度看:Topic 和 Queue 是 1 对多的关系,一个 Topic 下可以包含多个 Queue,主要用于负
    载均衡。发送消息时,用户只指定 Topic,Producer 会根据 Topic 的路由信息选择具体发到
    哪个 Queue 上。Consumer 订阅消息时,会根据负载均衡策略决定订阅哪些 Queue 的消息。

    从集群角度说:一个队列下的Queue会分片到不同的broker上,可以降低单Master的压力。

    从主从配置说:从机也支持消息消费,可以降低主机的压力。

    负载均衡策略都有哪些?

    consumer.setAllocateMessageQueueStrategy(实现AllocateMessageQueueStrategy接口的策略);
    

    (1)平均分配算法

    假如有10个队列,4个消费者,则分配规则是:10除4等于2余2。则每个消费者先分配2个Queue,余数的2个依次分给Consumer1和Consumer2。

    1597843353089

    (2)环形分配算法

    1597843063127

    (3)就近机房算法

    1597843124743

    刷盘策略

    (1)同步刷盘 SYNC_FLUSH

    返回成功状态时,消息已经被写入磁盘。

    消息写入内存 pagecache 后,立即通知刷盘线程,刷盘完成后,返回消息写成功的状态。

    (2)异步刷盘 ASYNC_FLUSH

    返回成功状态时,消息只是被写入内存 pagecache,写操作返回快,吞吐量达大,当内存里的消息积累到一定程度时,统一出发写磁盘动作,快速写入。

    复制策略

    (1)同步复制 SYNC_MASTER

    master 和 slave 都写成功后返回成功状态。好处是如果master出故障,slave上有全部备份,容易恢复。缺点是增大延迟,降低吞吐量。

    (2)异步复制 ASYNC_MASTER

    只要 master 写成功就返回成功状态。好处是低延迟、高吞吐,缺点是如果 master 出故障,数据没有写入 slave,就会有丢失。

    推荐 ASYNC_FLUSH + SYNC_MASTER。在消息响应和消息可靠性之间做一个平衡。

    三、基础用法

    请参考:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

    消息发送

    同步发送:应用于可靠性要求比较高的场景。比如:重要的消息通知,短信通知等。

    1598068347093

    异步发送:通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    1598068417386

    单向发送:主要用在不特别关心发送结果的场景,例如日志发送。

    1598068435338

    消息接收

    注册监听器接收消息

    1598068489299

    广播模式、集群模式

    同一个消费组内:

    集群模式下的同一个Message只会被一个消费者消费;

    广播模式下的同一个Message会被所有消费者消费。

    // 集群模式 (默认)
    consumer.setMessageModel(MessageModel.CLUSTERING); 
    // 广播模式 
    consumer.setMessageModel(MessageModel.BROADCASTING);
    

    消息标签

    可以为消息设置标签,方便灵活分类、消费过滤等。(Topic可以看做是一级分类,Tag可以看做是二级分类)

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
    consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
    

    四、常见问题分析

    顺序消息

    场景举例:订单确认->付款成功->通知发货->签收成功

    单个Queue中,消息的存储是有序的。要想保证消息的顺序性,需要保证消息发送到broker的有序性和消息接收处理完成的有序性。如何实现:

    ①生产者采用同步单线程发送消息至同一个Queue

    public class OrderlyProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("p-GroupOrder01");
            producer.setNamesrvAddr("192.168.0.112:9876");
            producer.start();
    
            List<String> order1 = new ArrayList<>();
            order1.add("订单1---创建");
            order1.add("订单1---付款");
            order1.add("订单1---发货");
            order1.add("订单1---签收");
            order1.add("订单1---完成");
            List<String> order2 = new ArrayList<>();
            order2.add("订单2---创建");
            order2.add("订单2---付款");
            order2.add("订单2---发货");
            order2.add("订单2---签收");
            order2.add("订单2---完成");
            resolveOrder(producer, order1, 1L);
            resolveOrder(producer, order2, 2L);
    
            producer.shutdown();
        }
    
        public static void resolveOrder(DefaultMQProducer producer, List<String> orderDetail,
                                        Long orderID) throws Exception{
            int size = orderDetail.size();
            for (int i = 0; i < size; i++) {
                Message msg = new Message("TopicTest", orderDetail.get(i).getBytes("UTF-8"));
                SendResult send = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Long id = (Long) arg;
                        long index = id % mqs.size();//根据订单id选择发送queue
                        return mqs.get((int) index);
                    }
                }, orderID);
            }
        }
    }
    

    ②消费者采用MessageListenerOrderly监听器

    MessageListenerOrderly的作用是通过锁的方式,保证同一时刻只有一个线程能从同一个Queue中拉取消息。

    而不是一个Queue只有一个处理线程。

    public class OrderConsumer {
        public static void main(String[] args) throws Exception{
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c-GroupOrder01");
            consumer.setNamesrvAddr("192.168.0.112:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "*");
            // 注册回调实现类来处理从broker拉取回来的消息
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    for (MessageExt msg : msgs) {
                        // 可以看到订单对每个queue(分区)有序
                        System.out.println("consumeThread=" + Thread.currentThread().getName()
                                + " queueId=" + msg.getQueueId() + ", content:"
                                + new String(msg.getBody()));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

    延迟消息

    场景举例:唯品会购物车倒计时取消订单。30min后去检查订单的状态,如果还是未付款就取消订单释放库存。

    设置消息延迟级别:

    // String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
     message.setDelayTimeLevel(3);
    

    事务消息

    场景举例:Bob向Smith转账

    1598091392963

    A、B都会有问题。主要原因在于发消息与扣款非原子性。通过事务消息可以解决。

    1598091479276

    1598093771820

    代码示例:

    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException {
            TransactionMQProducer producer = new TransactionMQProducer("p-TranGroup");
            producer.setNamesrvAddr("192.168.0.112:9876");
            producer.setTransactionListener(new TransactionListenerImpl());// 设置事务监听
            producer.start();
            for (int i = 0; i < 10; i++) {
                Message msg = new Message("TranTopic", ("hello transaction " + i).getBytes());
                TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
                System.out.println(result);
            }
            producer.shutdown();
        }
    }
    
    class TransactionListenerImpl implements TransactionListener {
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            // 这里调用本地事务的执行
            try {
                System.out.println("start local transaction....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return LocalTransactionState.UNKNOW;
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            // 这里进行本地事务状态的查询
            int status = new Random().nextInt(3);
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                default:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    }
    

    消息去重

    正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题(提示:MessageID),这也是RocketMQ不解决消息重复的问题的原因。

    高可用、高可靠

    ①多master部署,防止单节点故障,以此保障rocketmq的可用性。

    ②Producer的send方法本身支持内部重试,重试逻辑如下:1)至多重试3次;2)如果发送失败,则轮转到下一个Broker;3)这个方法的总耗时不超过producer.setSendMsgTimeout()设置的最大值,默认10s。这仍然无法保证消息100%成功,为保证消息一定成功,可以再send同步发送失败时,将消息存储到db,有后台任务去处理。以此保证rocketmq的可靠性。

    "我们所要追求的,永远不是绝对的正确,而是比过去的自己更好"
  • 相关阅读:
    mac 鼓捣php 多版本切换
    thinkPHP 导出excel 发布正式环境net::ERR_INVALID_RESPONSE
    js 计时显示 倒着 正者 都行
    LNMP 下 php.ini 文件修改后不生效
    Jquery 遍历数组之$().each方法与$.each()方法介绍
    js 去掉字符串最后一个逗号
    js拼接字符串时,字符串首出现undefined的问题
    PHP 暂停函数 sleep() 与 usleep() 的区别
    在IDEA里创建web项目,以及web 项目部署
    spring容器和springmvc容器,以及web容器的关系
  • 原文地址:https://www.cnblogs.com/zomicc/p/13531183.html
Copyright © 2011-2022 走看看