zoukankan      html  css  js  c++  java
  • RocketMQ笔记1-简介-单点模式-生产者消费者的使用-工作流程

    简介


    单机版安装

    • 通过docker安装RocketMQ Server + Broker + Console,至少需要 2G 内存

    • docker-compose.yml 如下:

    version: '3.5'services:
      rmqnamesrv:
        image: foxiswho/rocketmq:server
        container_name: rmqnamesrv
        ports:
          - 9876:9876
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
        networks:
            rmq:
              aliases:
                - rmqnamesrv
    
      rmqbroker:
        image: foxiswho/rocketmq:broker
        container_name: rmqbroker
        ports:
          - 10909:10909
          - 10911:10911
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
          - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
        environment:
            NAMESRV_ADDR: "rmqnamesrv:9876"
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
        command: mqbroker -c /etc/rocketmq/broker.conf
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqbroker
    
      rmqconsole:
        image: styletang/rocketmq-console-ng
        container_name: rmqconsole
        ports:
          - 8080:8080
        environment:
            JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqconsole
    
    networks:
      rmq:
        name: rmq
        driver: bridge
    
    • broker.conf

    RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在 ./data/brokerconf/ 目录下创建一个名为 broker.conf 的配置文件,内容如下:

    
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    
    # 所属集群名字
    brokerClusterName=DefaultCluster
    
    # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
    # 在 broker-b.properties 使用: broker-b
    brokerName=broker-a
    
    # 0 表示 Master,> 0 表示 Slave
    brokerId=0
    
    # nameServer地址,分号分割
    # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    
    # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
    # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
    # brokerIP1=192.168.198.123
    
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    
    # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
    autoCreateTopicEnable=true
    
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    
    # Broker 对外服务的监听端口
    listenPort=10911
    
    # 删除文件时间点,默认凌晨4点
    deleteWhen=04
    
    # 文件保留时间,默认48小时
    fileReservedTime=120
    
    # commitLog 每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    
    # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    
    # destroyMapedFileIntervalForcibly=120000
    # redeleteHangedFileInterval=120000
    # 检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    # 存储路径
    # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
    # commitLog 存储路径
    # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
    # 消费队列存储
    # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
    # 消息索引存储路径
    # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
    # checkpoint 文件存储路径
    # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
    # abort 文件存储路径
    # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
    # 限制的消息大小
    maxMessageSize=65536
    
    # flushCommitLogLeastPages=4
    # flushConsumeQueueLeastPages=2
    # flushCommitLogThoroughInterval=10000
    # flushConsumeQueueThoroughInterval=60000
    
    # Broker 的角色
    # - ASYNC_MASTER 异步复制Master
    # - SYNC_MASTER 同步双写Master
    # - SLAVE
    brokerRole=ASYNC_MASTER
    
    # 刷盘方式
    # - ASYNC_FLUSH 异步刷盘
    # - SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    
    # 发消息线程池数量
    # sendMessageThreadPoolNums=128
    # 拉消息线程池数量
    # pullMessageThreadPoolNums=128
    
    • 启动
    docker-compose up -d
    
    • 访问控制台
    http://ip:8080
    

    生产者的使用

    • 创建生产者对象DefaultMQProducer
    • 设置NamesrvAddr
    • 启动生产者服务
    • 创建消息并发送
    package com.wu.mq.quickstart;
    
    import com.wu.mq.constants.Const;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    /**
     * @author :wuba
     * @date :Created in 2019/10/27 21:38
     * @description:消息生产者
     */
    
    public class Producer {
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            //创建生产者对象
            DefaultMQProducer producer = new DefaultMQProducer("test_producerGroup");
            //设置Name Server addr
            producer.setNamesrvAddr(Const.NAMESRV_ADDR);
            //启动服务
            producer.start();
            //创建消息并发送
            for (int i = 0; i < 5; i++) {
                Message message = new Message("test_topic", "TagA", "Key" + i, ("Hello,RocketMQ" + i).getBytes());
                SendResult sr = producer.send(message);
                System.out.println(sr);
            }
            //关闭
            producer.shutdown();
        }
    }
    

    消费者的使用

    • 创建消费者对象DefaultMQPushConsumer
    • 设置NamesrvAddr 及消费位置ConsumeFromWhere
    • 进行订阅主题subscribe
    • 注册监听并消费 registerMessageListener
    package com.wu.mq.quickstart;
    
    import com.wu.mq.constants.Const;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import java.util.List;
    
    /**
     * @author :wuba
     * @date :Created in 2019/10/27 21:39
     * @description:消息消费者
     */
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者对象
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumerGroup");
            //设置NamesrvAddr 及消费位置ConsumeFromWhere
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            //订阅topic,  * 指该主题下的所有消息都能消费
            consumer.subscribe("test_topic","*");
            //注册监听并消费
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    MessageExt msg = list.get(0);
                    try {
                        String topic = msg.getTopic();
                        String tags = msg.getTags();
                        String keys = msg.getKeys();
                        if("Key3".equals(keys)){
                            System.out.println("消息消费失败...");
                            int i=1/0;
                        }
                        String msgBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
                    }catch (Exception e){
                        e.printStackTrace();
                        int reconsumeTimes = msg.getReconsumeTimes();
                        System.out.println("reconsumeTimes :"+reconsumeTimes);
                        if(reconsumeTimes == 2){
                            //记录日志,补偿处理
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费服务
            System.out.println("消费服务启动...");
            consumer.start();
        }
    }
    

    RocketMQ拓扑图


    • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

    • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

    • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

    • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。


    工作流程

    • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

    • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

    • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

    • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

    • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。


    参考

     RocketMQ开发者指南

  • 相关阅读:
    qt学习笔记(1):qt点击运行没有反应。
    JS Object类型
    JS Boolean数据类型和数据类型转换规律
    CSS雪碧图
    CSS
    PS基础
    JS number数字类型
    js中的变量和数据类型
    JS 基础
    单词
  • 原文地址:https://www.cnblogs.com/wuba/p/11771848.html
Copyright © 2011-2022 走看看