zoukankan      html  css  js  c++  java
  • RabbitMQ

    [TOC]

    RabbitMQ

    1.MQ引言

    1.1什么是MQ MQ(Message Quene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耗。别名为消息中间件通过利用高效可靠的消息传递机制进行平台无关的数据通信来进行分布式系统的集成。 1.2 MQ有哪些 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。 1.3不同MQ特点

    • ActiveMQ

    Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!

    • Kafka

    LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

    • RocketMQ

    阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

    • RabbitMQ

    使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

    RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集

    2.RabbitMQ的引言

    2.1 RabbitMQ

    基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

    官网

    官方文档

    • AMQP协议

    AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

    image

    image

    安装rabbitMQ第一种方案(建议使用)

    1、下载并安装包

    安装:rpm -ivh 包名,注意顺序

    1. erlang-22.0.7-1.el7.x86_64.rpm
    2. socat-1.7.3.2-2.el7.x86_64.rpm
    3. rabbitmq-server-3.7.18-1.el7.noarch.rpm

    2、修改配置文件

    # 1、找到配置模板文件
    	/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example
    # 或者执行命令找到路径
    	find / -name rabbitmq.config.example
    

    3、打开来宾用户权限

    4、开启web管理插件

    rabbitmq-plugins enable rabbitmq_management
    

    5、关闭防火墙或者打开5672和15672端口

    # 方案一(建议):打开两个指定端口
    	firewall-cmd --zone=public --add-port=15672/tcp
    	firewall-cmd --zone=public --add-port=5672/tcp
    # 方案二:关闭防火墙
    	systemctl stop firewalld
    

    防火墙及端口常用操作参考

    6、rabbitmq常用操作

    # 启动服务:
    	systemctl start rabbitmq-server.service
    	# 重启服务
    	systemctl restart rabbitmq-server.service
    # 停止服务:
    	systemctl stop rabbitmq-server.service
    # 查看状态服务:
    	systemctl status rabbitmq-server.service
    # rabbitmq管理控制命令与systemctl管理控制命令类似
    	rabbitmqctl -help
    

    可能遇到的问题

    # 问题一
    	undefined: There is no template at js/tmpl/undefined.ejs undefined
    # 处理方案:重启服务,使用IE浏览器访问服务
    	service rabbitmq-server restart
    # 问题二(如下图)处理方案:
    	rabbitmqctl stop
    	systemctl start rabbitmq-server.service 
    


    安装RabbitMQ第二种方案(不推荐)

    参考博客原文

    安装erlang依赖

    由于rabbitmq是基于erlang语言开发的,所以必须先安装erlang。

    以下操作起始路径为:/root

    # 1、安装依赖
    	yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel
    # 2、下载erlang官网 https://www.erlang.org/downloads 会比较慢,请耐心等待
    	# 此处使用第三方源下载
    	wget http://file.zhaobl.com/common/otp_src_22.0.tar.gz
    # 3、解压
    	tar -zxvf otp_src_22.0.tar.gz
    # 4、移动位置
    	mv otp_src_22.0 /usr/local/
    # 5、切换目录
    	cd /usr/local/otp_src_22.0/
    # 6、创建即将安装的目录
    	mkdir ../erlang
    # 7、配置安装路径
    	./configure --prefix=/usr/local/erlang
    

    可能会遇到如下警告,没大问题

    # 8、安装
    	make install
    # 9、查看一下是否安装成功
    	ll /usr/local/erlang/bin
    # 10、添加环境变量
    	echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile
    # 11、刷新环境变量
    	source /etc/profile
    # 12、甩一条命令测试erlong是否安装成功
    	erl
    # 退出erlong的shell
    	halt().
    

    安装RabbitMQ

    rabbitmq下载地址github

    以下操作起始路径为:/root

    # 1、下载(/root目录下执行)
    	wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
    # 由于是tar.xz格式的所以需要用到xz,没有的话就先安装 `yum install -y xz`
    # 2、第一次解压
    	/bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz`
    # 3、第二次解压
    	tar -xvf rabbitmq-server-generic-unix-3.7.15.tar
    # 4、移走
    	mv rabbitmq_server-3.7.15/ /usr/local/
    # 5、改名
    	mv /usr/local/rabbitmq_server-3.7.15 /usr/local/rabbitmq
    # 6、配置环境变量
    	echo 'export PATH=$PATH:/usr/local/rabbitmq/sbin' >> /etc/profile
    # 7、刷新环境变量
    	source /etc/profile
    # 8、创建配置目录(本博客并未使用单独的配置文件,因此本步骤纯属多余。)
    	mkdir /etc/rabbitmq
    

    RabbitMQ启动命令

    # 启动rabbitmq:
    	systemctl start rabbitmq-server.service   打开服务
    	rabbitmq-server -detached
    # 停止rabbitmq:
    	systemctl stop rabbitmq-server.service
    	rabbitmqctl stop
    # 查看rabbitmq状态:
    	systemctl status rabbitmq-server.service
    	rabbitmqctl status
    # 关闭防火墙或者自行打开(5672和15672)端口
    	# 查看firewall防火墙状态
    	systemctl status firewalld			
    	# 关闭防火墙
        systemctl stop firewalld
    	# 打开防火墙
        systemctl start firewalld
    # 开启访问端口测试
    	# 查看已经开启的端口
    	firewall-cmd --list-ports
    	firewall-cmd --zone=public --add-port=15672/tcp
    	firewall-cmd --zone=public --add-port=5672/tcp
    	# 重启防火墙(会导致上方不是永久打开的端口关闭)
    	systemctl reload firewalld.service
    

    防火墙及端口配置参考

    RabbitMQ配置WEB管理

    # 开启web管理插件
    	rabbitmq-plugins enable rabbitmq_management
    # 访问
    	http://服务器地址:15672/
    

    默认账号密码:guest guest(这个账号只允许在localhost机器上访问)

    RabbitMQ配置用户管理

    # 查看所有用户
    	rabbitmqctl list_users
    # 添加一个用户
    	rabbitmqctl add_user 用户名 密码
    # 配置权限
    	rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
    # 查看用户权限
    	rabbitmqctl list_user_permissions 用户名
    # 设置tag
    	rabbitmqctl set_user_tags 用户名 administrator
    # 删除用户(安全起见,删除默认用户)
    	rabbitmqctl delete_user guest
    

    登陆

    配置好用户后重启rabbit,然后用新账号进行登陆

    RabbitMQ配置

    1、命令行配置

    [root@localhost ~]# rabbitmq
    # 管理命令行,在不使用web管理界面主要使用的配置命令
        rabbitmqctl -help
    # 插件管理命令行
        rabbitmq-plugins
    # 服务启动相关命令 systemctl status|stop|restart
        rabbitmq-server   
    

    2、web管理界面

    RabbitMQ的hello word

    AMQP协议回顾

    消息模型

    参考

    1、在web管理界面中建立虚拟主机

    2、创建用户

    3、绑定虚拟主机

    第一种模型(直连)

    在上图的模型中,有以下概念

    • P:生产者,也就是要发送消息的程序

    • C:消费者:消息的接受者,会一直等待消息到来。

    • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

    • 引入依赖

        <!--引入rabbitMQ依赖-->
        <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.7.2</version>
        </dependency>
    

    1、开发消息生产者

    package com.jia;
    
    import com.jia.utils.RabbitMQ;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Provider {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            /*
            // 创建工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置rabbitmq服务地址
            connectionFactory.setHost("192.168.8.8");
            //设置端口
            connectionFactory.setPort(5672);
            //设置虚拟主机
            connectionFactory.setVirtualHost("/ems");
            connectionFactory.setUsername("jia");
            connectionFactory.setPassword("123456");
            //获得连接
            Connection connection = connectionFactory.newConnection();
            */
    
            Connection connection = RabbitMQ.getConn();
            //创建访问队列的通道
            Channel channel = connection.createChannel();
            //通道绑定队列:durable是否将队列持久化;
            channel.queueDeclare("hello", false, false, false, null);
            //发布消息
            //向hello队列中添加一条消息;props:MessageProperties.PERSISTENT_TEXT_PLAIN   消息持久化
            channel.basicPublish("", "hello", null, "hello rabbitMQ".getBytes());
            /*
            //关闭通道
            channel.close();
            //关闭连接
            connection.close();
            */
            RabbitMQ.close(channel,connection);
        }
    }
    

    2、消费者

    package com.jia;
    
    import com.jia.utils.RabbitMQ;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Customer {
        public static void main(String[] args) throws IOException, TimeoutException {
            /*
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.8.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/ems");
            connectionFactory.setUsername("jia");
            connectionFactory.setPassword("123456");
            Connection connection = connectionFactory.newConnection();
            */
            //工具类获得连接
            Connection connection = RabbitMQ.getConn();
            //连接创建通道
            Channel channel = connection.createChannel();
            //通道绑定队列
            channel.queueDeclare("hello", false, false, false, null);
            //消费消息
            //提供一个默认的消费者使用通道来消费队列中消息
            channel.basicConsume("hello", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)
                        throws IOException {
                    System.out.println(new String(body));
                }
            });
    
        }
    }
    

    3、工具类

    package com.jia.utils;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitMQ {
        private static final ConnectionFactory connectionFactory;
    
        static {
            connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.8.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/ems");
            connectionFactory.setUsername("jia");
            connectionFactory.setPassword("123456");
        }
    
        public static Connection getConn() {
            try {
                return connectionFactory.newConnection();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public static void close(Channel channel, Connection connection) {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    第二种模型(work quene)

    Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

    ​ 此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

    角色:

    • P:生产者:任务的发布者
    • C1:消费者,领取任务并且完成任务,假设完成速度较慢
    • C2:消费者2:领取任务并完成任务,假设完成速度快

    注意:这种模型队列会将任务平均分配给每个消费者

    代码同上个模型类似此处就不一一列出

    消息自动确认机制是导致任务平均分配的原因,如果希望达到性能好的多执行,性能差的少执行需将通道的step1:autoAck参数置为false;

    step2:设置每次消费数据,消息确认行为

    step3:

    队列收到消费者确认的信号才会将该消息从队列中移除,确保了数据的完整性;

    第三种模型fanout(广播)

    fanout:也称广播

    场景:注册成功(生产者),向邮件任务队列,积分任务队列,短信任务队列派发任务。

    在广播模式下,消息发送流程是这样的:

    • 可以有多个消费者
    • 每个消费者有自己的queue(队列)
    • 每个队列都要绑定到Exchange(交换机)
    • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
    • 交换机把消息发送给绑定过的所有队列
    • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

    1.开发生产者

    //声明交换机
    channel.exchangeDeclare("logs","fanout");//广播一条消息多个消费者同时消费
    //发布消息
    channel.basicPublish("logs","",null,"hello".getBytes());
    

    2.开发消费者-1

    //绑定交换机
    channel.exchangeDeclare("logs","fanout");
    //创建临时队列
    String queue = channel.queueDeclare( ).getQueue( ) ;
    //将临时队列绑定exchange
    channel.queueBind(queue,"logs","");
    //处理消息
    channel.basicConsume(queue,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte[] body) throws IOException(
    System.out.println("消费者1:"+new String(body));
    });
    

    开发消费者-2、开发消费者-3、开发消费者-4、开发消费者-5、、、

    第四种模型Routing之订阅模型-Direct

    在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

    在Direct模型下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个Routing key(路由key)
    • 消息的发送方在向Exchange发送消息时,也必须指定消息的Routing Key
    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,当队列的Routing key与消息的 Routing key完全一致,才会接收到消息流程

    图解:

    • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
    • C1:消费者,其所在队列指定了需要routing key为error的消息
    • C2:消费者,其所在队列指定了需要routing key为info、error、warning的消息

    1.开发生产者

    2、开发消费者

    第五种模型 Routing 之订阅模型-Topic

    Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑outingkey的时候使用通配符!这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”分割,例如:item.ins

    # 统配符
    	* 匹配一个词
    	# 匹配一个或多个词
    # 单词分隔符
    	. 原点作为词与词之间的分隔
    # 如:
    	audit.#		匹配audit.irs或者audit.irs.corporate等
    	audit.* 	只能匹配 audit.irs
    

    1、生产者

    2、消费者


    springBoot简单整合RabbitMQ

    搭建springBoot初始环境——

    实现第一种简单模型

    1、引入依赖
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-bopt-starter-amqp</artifactId>
    </dependency>
    
    2、项目架构

    3、完整pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.8.RELEASE</version>
            <!--注意此语句设置相对路径-->
            <relativePath/>
        </parent>
        
        <groupId>com.jia</groupId>
        <artifactId>springBoot_rabbitMQ</artifactId>
        <version>1.0-SNAPSHOT</version>
        <description>springBoot与rabbitMQ消息中间件简单整合</description>
    
        <dependencies>
            <!--web依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!--amqp依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <!--测试框架-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!--amqp测试包-->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    </project>
    
    4、配置配置文件
    spring
    	application
    		name:springboot_rabbitmq
    	rabbitmq:
    		host:10.15.0.9
    		port:5672
    		username : ems
    		password:123
    		virtual-host:/ems
    
    5、消费者HelloConsumer.java
    package com.jia.hello;
    
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    @Component   //  默认创建的消费者就是 持久化 非独占 不自动删除的队列:查看
    @RabbitListener(queuesToDeclare = @Queue("hello"))
    public class HelloConsumer {
    
        @RabbitHandler  // 相当于消费者接到队列中消息执行的回调
        public void receive(String message) {
            System.out.println("message = " + message);
        }
    }
    
    6、测试TestRabbitMQ.java
    package com.jia.test;
    
    import com.jia.APP;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    
    @SpringBootTest(classes = APP.class)
    @RunWith(SpringRunner.class)
    public class TestRabbitMQ {
    
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        @Test
        public void test01() {
            rabbitTemplate.convertAndSend("hello", "hello word");
        }
    }
    
    

    work模型实现

    在前面的基础上

    生产者

        // work
    	@Test
        public void testWork(){
            for (int i = 0; i < 100; i++) {
                rabbitTemplate.convertAndSend("work","这是work模型"+i);
            }
        }
    

    消费者

    @Component
    public class WorkConsumer {
        
        // 第一个消费者
        @RabbitListener(queuesToDeclare = @Queue("work"))
        public void receive(String message) {
            System.out.println("message1 =========================================== " + message);
        }
    
        // 第二个消费者
        @RabbitListener(queuesToDeclare = @Queue("work"))
        public void receive2(String message) {
            System.out.println("message2 =========================================== " + message);
        }
    }
    

    非公平消费机制,自动实现能者多劳的机制

    第三种 fanout广播模型

    在前面代码基础上

    生产者

        // fanout 广播模型
        @Test
        public void testFanout(){
            rabbitTemplate.convertAndSend("logs","","Fanout的模型发送的消息");
        }
    

    消费者

    @Component
    public class FanoutConsumer {
    
        @RabbitListener(bindings = {  // 绑定队列
                @QueueBinding(value = @Queue, // 临时队列
                        exchange = @Exchange(value = "logs", type = ExchangeTypes.FANOUT))
        })
        public void receive(String message) {
            System.out.println("message1 = " + message);
        }
    
    
        @RabbitListener(bindings = {  // 绑定队列
                @QueueBinding(value = @Queue, // 临时队列
                        exchange = @Exchange(value = "logs", type = ExchangeTypes.FANOUT))
        })
        public void receive2(String message) {
            System.out.println("message2 = " + message);
        }
    }
    

    第四种路由模型

    生产者

        // 路由模型
        @Test
        public void testRouter() {
            rabbitTemplate.convertAndSend("directs", "info", "发送的info的key的路由信息");
            rabbitTemplate.convertAndSend("directs", "error", "发送的error的key的路由信息");
        }
    

    消费者

    @Component
    public class DirectConsumer {
    
    
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue,
                        exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),
                        key = {"info"})
        })
        public void receive(String message) {
            System.out.println("message = " + message);
        }
    
        // 消费者2
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue,
                        exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),
                        key = {"error"})
        })
        public void receive2(String message) {
            System.out.println("message2 = " + message);
        }
    
        // 消费者3
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue,
                        exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),
                        key = {"info", "error"})
        })
        public void receive3(String message) {
            System.out.println("message3 = " + message);
        }
    
    }
    

    第五种topic订阅模型

    生产者

        // topic模型,订阅模式
        @Test
        public void testTopic() {
            rabbitTemplate.convertAndSend("topic", "user.info", "发送topic的key-user.info的信息");
            rabbitTemplate.convertAndSend("topic", "user", "发送topic的key-user的信息");
        }
    

    消费者

    @Component
    public class TopicConsumer {
    
    
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue,
                        exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
                        key = {"user.*"})
        })
        public void receive(String message) {
            System.out.println("message1 = " + message);
        }
    
    
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue,
                        exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
                        key = {"user.#"})
        })
        public void receive2(String message) {
            System.out.println("message2 = " + message);
        }
    
    }
    

    RabbitMQ应用场景

    异步处理

    场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种

    1. 串行的方式
    2. 并行的方式
    • **串行方式:**将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。

    • **并行方式:**将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

    • **消息队列:**假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有 任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.消息队列:引入消息队列后,把发送邮件,短信不是必须辑异步处理

    应用解耦

    场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

    这种做法有一个缺点: 当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合引入消息队列

    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。 库存系统:订阅下单的消息,获取下单消息,进行库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

    流量削峰

    **场景:**秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

    作用:

    1. 可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢A)
    2. 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

    1. 用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面
    2. 秒杀业务根据消息队列中的请求信息,再做后续处理.

    RabbitMQ集群搭建

    集群架构

    普通集群

    镜像集群

    按需学习

    参考视屏

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利
  • 相关阅读:
    Jenkins安装部署及使用
    Jenkins详细安装与构建部署使用教程
    线程池使用拒绝策略时需要注意的坑
    线程池的4种拒绝策略
    neo4j allshortestpaths查询路径不准确问题
    程序员必备的网站之Tutorialspoint
    All shortest paths between a set of nodes
    Neo4j/Cypher: All paths between two nodes with a relationship property filter
    12款好看的英文字体下载《可以免费用于商业用途》
    国外经典设计:12个漂亮的移动APP网站案例
  • 原文地址:https://www.cnblogs.com/hhddd-1024/p/14508256.html
Copyright © 2011-2022 走看看