1基础
1.核心思想:接收并转发消息。你可以把它想象成一个邮局。
2.Proucer(pu rao 色):生产者(消息发送者)。
3.Queue :邮箱,接收(一个或多个)proucer生产的消息,提供给一个或多个消费者消费。
4.Consumer(抗苏门):消费者
5.注意:proucer和queue还有consumer可以分开在单独服务器上
6.MQ(Message Queue):消息队列,队列(先进先出,有三个作用:入列、存储、出列)
7.消息队列的特征:
(1) 业务无关
(2) FIFO(先进先出)
(3) 容灾
(4) 性能:吞吐量内部通信
8.使用消息队列原因
(1) 系统解耦:系统a(消费卷)产生消息 系统b(订单)/缓存模块接受消息。
① 如果是系统a去产生消息给b、c(直接调用b、c接口),比较耗时
② 不易维护
③ a产生消息给消息队列,然后b、c去订阅消息
(2) 异步调用(同步:需要等返回结果,然后在进行下一个。异步:同时进行多个,不用等结果返回)
(3) 流量削峰:比如每天只有半个小时是流量高峰,现有的服务器数量不足以支撑。加服务器会浪费、不加这段时间搞不过去。所以使用消息队列,先将所有请求存储,然后按照服务器可接受范围吐出。只到流量高峰过去
9.使用举例:【此段举例转载】
以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
10.AMQP协议中间的几个重要概念:【此段转载】
- Server:接收客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Server的网络连接,TCP连接。
- Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
- Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
- Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息队列,用来保存消息,供消费者消费。
2RabbitMq特点
- 开源、跨语言
- Erlang语言编写(性能好、延迟低)
- 应用广泛
- 社区活跃、api丰富
3AMQP协议(Advanced Message Queuing Protocol)
4RabbitMQ核心概念
- Server:服务
- connection:与Server建立连接
- channel:信道,几乎所有的操作都在信道上进行,客户端可以建立多个信道。
- message:消息,由properties和body组成
- virtual host:虚拟主机,顶层隔离。同一个虚拟主机下,不能有重复的交换机和queue
- Exchange:交换机,接收生产者的消息的,然后根据指定的路由器去把消息转发到所绑定的队列上 u binding:绑定交换机和队列
- routing key:路由键,路由规则,虚拟机可以用它来确定这个消息如何进行一个路由
- queue:队列,消费者只需要监听队列来消费消息,不需要关注消息来自于哪个Exchange
- Exchange和Message Queue存在着绑定的关系,一个Exchage可以绑定多个消息队列
5消息流转过程
6创建一个简单的发送接收例子
1.1首先给账号授权
1.2创建java项目,并且引入相应的maven
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-nop --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.30</version> <scope>test</scope> </dependency> </dependencies>
1.3创建发送者
package item.com; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** 描述: 的发送类,连接到RabbitMQ服务端, 然后发送一 条消息,然后退出。 */ public class Send { private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置 RabbitMQ 地址 todo setHost设置腾讯云服务器就不行,需要周末排查以下 connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //建立连接 Connection connection = connectionFactory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // channel.queueDeclare(QUEUE_NAME,是否需要持久(服务器重启是否还在),队列是否独有,是否需要自动删除,参数); //发布消息 String message = "Hello World2233!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); // channel.basicPublish(交换机,QUEUE_NAME,额外配置,message.getBytes("UTF-8")); System.out.println("消息发送"+message); //关闭 channel.close();; connection.close(); } }
1.4创建接收者
package item.com; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接受消息 */ public class Recv { private final static String QUEUE_NAME = "hello"; //接受消息不用关闭,一直打开接收 public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置 RabbitMQ 地址 todo setHost设置腾讯云服务器就不行,需要周末排查以下 connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //建立连接 Connection connection = connectionFactory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.queueDeclare(QUEUE_NAME,是否需要持久(服务器重启是否还在),队列是否独有,是否需要自动删除,参数); //接受消息 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("收到消息:"+s); } }); // channel.basicConsume(QUEUE_NAME,是否通知发送者已经签收-确认,处理消息) } }
1.5查看运行
7多消费者平均压力
2.1多消费者
消费者可以同时启动多个,比如Recv1、Recv2(代码完全相同)。这样就会平均的去处理消息(数量上平均)。但是不同的消息耗时不同。又导致不平均
2.2平均消费者【下列红色代码部分】,这样就根据时间来而不是数量平均
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 接受消息 */ public class Recv3 { private final static String QUEUE_NAME = "hello"; //接受消息不用关闭,一直打开接收 public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置 RabbitMQ 地址 todo setHost设置腾讯云服务器就不行,需要周末排查以下 connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //建立连接 Connection connection = connectionFactory.newConnection(); //获得信道 final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.queueDeclare(QUEUE_NAME,是否需要持久(服务器重启是否还在),队列是否独有,是否需要自动删除,参数); //接受消息 channel.basicQos(1);//处理完之前不接受下一个任务 channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println(" [x] Received '" + s + "'"); channel.basicAck(envelope.getDeliveryTag(), false); } }); // channel.basicConsume(QUEUE_NAME,是否通知发送者已经签收-确认,处理消息) } }