zoukankan      html  css  js  c++  java
  • RocketMQ入门教程

    一、RocketMQ 是什么

         Github 上关于 RocketMQ 的介绍:RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

    1. 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型

    2. 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递

    3. 支持拉(pull)和推(push)两种消息模式

    4. 单一队列百万消息的堆积能力

    5. 支持多种消息协议,如 JMS、MQTT 等

    6. 分布式高可用的部署架构,满足至少一次消息传递语义

    7. 提供 docker 镜像用于隔离测试和云集群部署

    8. 提供配置、指标和监控等功能丰富的 Dashboard

    对于这些特性描述,大家简单过一眼就即可,深入学习之后自然就明白了。

    专业术语

    Producer

    消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。

    Producer Group

    生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。

    Consumer

    消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。

    Consumer Group

    消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。

    Topic

    Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。

    Message

    Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。

    Tag

    标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

    Broker

    Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。

    Name Server

    Name Server 为 producer 和 consumer 提供路由信息。

    二、RocketMQ 架构

     

    由这张图可以看到有四个集群,分别是 NameServer 集群、Broker 集群、Producer 集群和 Consumer 集群:
    1. NameServer: 提供轻量级的服务发现和路由。 每个 NameServer 记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。
    2. Broker: 通过提供轻量级的 Topic 和 Queue 机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。
    3. Producer:生产者,产生消息的实例,拥有相同 Producer Group 的 Producer 组成一个集群。
    4. Consumer:消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的Consumer 组成一个集群。

         简单说明一下图中箭头含义,从 Broker 开始,Broker Master1 和 Broker Slave1 是主从结构,它们之间会进行数据同步,即 Date Sync。同时每个 Broker 与NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer 中。

        Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。

    附加说明:

        (1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步

        (2)Broker部署相对复杂,Broker氛围Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer

        (3)Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Produce完全无状态,可集群部署

        (4)Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳。Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定

     

    三、RocketMQ 集群部署模式

       0.  单 master 模式

            也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用,适合个人学习使用。

      1. 多 master 模式
        多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。
        优点:所有模式中性能最高
        缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
        注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。
      2. 多 master 多 slave 异步复制模式
        在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master
        节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。
        优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
        缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
      3. 多 master 多 slave 同步双写模式
        同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
        优点:同步双写的同步模式能保证数据不丢失。
        缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
        刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
        同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)
        注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。

    四、入门程序

    1. 下载

    http://rocketmq.apache.org/dowloading/releases/

    选择Binary包

    这里解压缩后放到了D盘, 且为了操作方便, 把文件夹从rocketmq-all-4.5.0-bin-release改名为RocketMQ

    默认设置占用内存很大, 如果不是土豪配置需要修改一下

    NameServer设置: D:RocketMQin unserver.cmd

    修改为512m 512m 256m即可

    Broker设置: D:RocketMQin unbroker.cmd

    2. 设置环境变量

    桌面的计算机上点击右键 -> 属性

    3. 启动NameServer

    在D:RocketMQin目录下启动命令行, 执行 runserver.cmd

    4. 启动Broker

    执行mqbroker.cmd -n localhost:9876

    注意图中, Broker注册的地址是192.168.12.22:10911, 接下来需要用到

    5. 创建Topic

    执行mqadmin.cmd updateTopic -n localhost:9876 -b 192.168.12.22:10911 -t demo

    其中用-b指定Broker, 即步骤4中显示的地址

    开启MQ这三个步骤如果出现闪退、找不到主类等问题,可以参考文末连接,有详细解决方案

    6. 开发生产者Producer

    建立一个SpringBoot项目, 添加依赖

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>4.4.0</version>
    </dependency>

    创建ProducerService

    package com.mq.rocketmq.service;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    @Service
    public class ProducerService {
    
    
        private DefaultMQProducer producer = null;
    
        @PostConstruct
        public void initMQProducer() {
            producer = new DefaultMQProducer("defaultGroup");
            producer.setNamesrvAddr("localhost:9876");
            producer.setRetryTimesWhenSendFailed(3);
    
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        public boolean send(String topic, String tags, String content) {
            Message msg = new Message(topic, tags, "", content.getBytes());
            try {
                producer.send(msg);
                return true;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        @PreDestroy
        public void shutDownProducer() {
            if(producer != null) {
                producer.shutdown();
            }
        }
    
    }

    测试

    package com.mq.rocketmq;
    
    import com.mq.rocketmq.service.ProducerService;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import static org.junit.jupiter.api.Assertions.assertTrue;
    
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class RocketmqApplicationTests {
    
        @Autowired
        private ProducerService producerService;
    
        @Test
        public void contextLoads() {
            boolean result = producerService.send("demo", "TAG-A", "Hello RocketMQ2");
            assertTrue(result);
    
        }
    
    }

    7. 开发消费者

    创建ConsumerService

    package com.mq.rocketmq.service;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import java.util.List;
    
    @Service
    public class ConsumerService {
    
        private DefaultMQPushConsumer consumer = null;
    
        @PostConstruct
        public void initMQConsumer() {
            consumer = new DefaultMQPushConsumer("defaultGroup");
            consumer.setNamesrvAddr("localhost:9876");
            try {
                consumer.subscribe("demo", "*");
                consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(
                            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        for (MessageExt msg : msgs) {
                            System.out.println("Message Received: " + new String(msg.getBody()));
    
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        @PreDestroy
        public void shutDownConsumer() {
            if (consumer != null) {
                consumer.shutdown();
            }
    
        }
    }

    启动项目

    原文出处:

    https://www.cnblogs.com/zhangwuji/p/8134522.html

    https://blog.csdn.net/autfish/article/details/89226461

    参考:

    windows下安装rocketmq采坑全记录, https://blog.csdn.net/kobewwf24/article/details/82712461




  • 相关阅读:
    重写与重载的区别
    UDP模式与TCP模式的区别
    什么是GC?为什么会有GC?
    centos 7-8 安装 ms sql server 2019
    Phaser3 游戏开发入门——自定义构建Phaser库
    Visual Studio 下C#编译器在解析属性名时如果增加一个get_[您的另一个已经包含在类中属性名]的属性会报错,微软大哥这是什么鬼?
    Visual Studio 2015 Update 3 ISO
    react项目中引用amap
    js 截取网址中的某一段字符串
    解决react下找不到原生高德地图AMap类的问题
  • 原文地址:https://www.cnblogs.com/ryelqy/p/14317528.html
Copyright © 2011-2022 走看看