目录:
- 什么是消息中间件
- MQ的作用
- JMS规范与AMQP协议
- RabbitMQ组件
- 消息过期
- RabbitMQ实现延迟队列
- 持久化
- 事务
- 发送方确认机制
- RabbitMQ管理
- RabbitMQ集群
- 镜像队列
- 关于RabbitMQ性能优化的建议
- RabbitMQ实战
1、什么是消息中间件
Message Queue Middleware,简称MQ,是一种利用高效可靠的消息传递机制进行与平台无关的数据交互的技术。
2、MQ的作用
异步:类似于短信业务,将需要发送的消息放入MQ中,让其它主流程业务正常运行,异步监听短信消息。
解耦:利用MQ隐含的、基于数据的接口达到解耦;就像家里的电器(如冰箱、烤炉、热水器等),它们并不是直接接在电路上的,而是通过插座达到通电的目的。
削峰:请求一进来,便将数据放入MQ,然后依次执行,降低后端服务压力。
冗余:某些情况下处理数据失败就会丢失数据,此时可以将这些数据备份到MQ中,直到处理完再舍弃这些消息。
3、JMS规范与AMQP协议
Java Message Service,仅适用于java平台的消息中间件规范。
元素:连接工厂、JMS连接、JMS会话、JMS目的、JMS生产者、JMS消费者、Broker。
Advanced Message Queuing Protocol,支持不同语言的消息中间件规范。
组件:
a、生产者、消费者。
b、消息:包括有效载荷与标签;有效载荷是存储需要传输的数据,标签是描述有效载荷属性东西。
c、信道、交换器、队列、路由键:生产者将消息通过信道发送到交换器,交换器通过路由键将消息路由到不同的队列后供消费者消费。
RabbitMQ特性:高效可靠易扩展、消息确认机制、队列消息持久化、消息拒收、默认交换器、mandatory(消息防丢失)。
4、RabbitMQ组件
a、交换器:
交换器的类型:
- fanout:仅需交换器匹配。
- direct:BindingKey和RoutingKey完全匹配。
- topic:广播,类似于direct;但BindingKey允许使用通配符,#可匹配多个或零个单词,*匹配一个单词。
- header:根据消息内容中的headers属性进行匹配。
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException
创建交换器参数说明:
- exchange:交换器名称。
- type:交换器类型。
- durable:是否持久化。
- autoDelete:是否自动删除,前提是必须要有解绑动作,且是全部与这个交换器解绑。
- internal:是否内置路由器,客户端无法直接发送消息到交换器,只能通过交换器路由到内置路由器。
- argument:其它结构化参数。
argument:alternate-exchange(备份交换器)
Map<String, Object> args = new HashMap<String, Object>(1); args.put("alternate-exchange", "exchangeName");
public AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException
删除交换器参数说明:
- exchange:交换器名称。
- ifUnused:是否未使用,true=交换器未使用时才会被删除,false=直接删除。
b、队列:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
定义队列参数说明:
- queue:队列名称。
- durable:是否持久化。
- exclusive:是否排他,true=仅对首次创建它的连接可见,并在连接段开始自动删除;适用于同一个客户端同时发送读写消息的场景。
- autoDelete:是否自动删除,当连接该队列的消费者都断开连接后,队列删除。
- arguments:定义队列的参数列表。
arguments:
- x-message-ttl:消息过期时间,单位ms。
- x-expries:静置消息删除时间,单位ms。
- x-max-length:队列消息最大长度。
- x-max-length-bytes:队列最大占用空间大小,单位B。
- x-dead-letter-exchange:死信队列交换器名称。
- x-dead-letter-routing-key:死信队列路由键。
- x-max-priority:队列优先级。
- x-queue-mode:将消息保存在磁盘上,不存在内存中,当消费者开始消费时才加载到内存中。
通过x-dead-letter-exchange、x-dead-letter-routing-key实现死信队列(DLX):
DLX实质就是一个交换器,只是在队列满足以下条件时会将消息转到DLX里:
- 消息被拒,且requeue=false
- 队列过期或队列达到最大长度
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
删除队列参数说明:
- queue:队列名称。
- ifUnused:无消费者消费时删除。
- ifEmpty:队列无消息时删除。
c、生产者:
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
发送消息参数说明:
- exchange:交换器名称。
- routingKey:路由键。
- mandatory:true=当发送的消息无法根据交换器类型和路由键匹配到合适的队列上时(自身未满足条件),将消息返回给生产者;false=将消息丢弃。
- immediate:true=当路由到队列上时无任何消费者(他人不满足条件),则将消息返回给生产者;false=将消息丢弃。
- props:消息的属性。
- body:消息体。
d、消费者:
消费者有两种方式消费,一种是推(自动监听),一种是拉(手动获取单条消息)。
推:String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
拉:GetResponse basicGet(String queue, boolean autoAck) throws IOException;
参数说明:
- autoAck:是否自动确认消息已被消费。
- noLocal:设置为ture时,不能将同一个connection生产的消息在此connection消费,也就是说一个connection不能同时为生产者和消费者。
- exclusive:是否排他。
- callback:接收到消息的回调函数,用于处理具体逻辑。
消息的确认与拒绝:
当消费消息autoAck=false时,消费者可以手动确认消息已消费。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
参数说明:
- multiple:true=确认消费该信道上deliveryTag标签之前所有未确认的消息。
拒绝单条消息:void basicReject(long deliveryTag, boolean requeue) throws IOException;
拒绝多条消息:void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
5、消息过期
RabbitMQ中消息过期分为两种,一种是队列过期,一种是消息过期。
队列过期就是设置queueDeclare的arguments的x-message-ttl,消息过期是通过发送消息的props参数确定。
AMQP.BasicProperties.Builder publishBuilder = new AMQP.BasicProperties.Builder(); // expiration单位ms publishBuilder.expiration("10000"); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, publishBuilder.build(), "ttl".getBytes());
对于第一种TTL来说,队列一但过期就会删除掉;但对于第二种TTL来说,队列过期不会马上删除,而是等队列要被消费时再判断是否要删除。
那为什么会不一样呢,我们都知道mq对性能的要求是非常高的,如果第二种TTL的方式也要及时删除的话势必要扫描整个队列,这样的话,若队列长度较大是性能便会非常的差。
而第一种为什么可以做到及时删除呢,我们知道队列具有先进先出的特性,所以先入队的肯定要比后入队的要先过期,所以只要删除头部的就好啦。
而第二种的消息过期时间都是不固定的,考虑到MQ的性能,所以采用了上述的方式。
6、RabbitMQ实现延迟队列
a、DB轮询:通过job或其它逻辑将订单表的必要字段查出(如:orderId、createTime、status),当订单超过xx时间,将状态置为失效。
b、JDK DelayQueue:java api提供的延迟队列的实现,通过poll()、take()方法获取超时任务。
c、Redis sortedSet:通过zset类型的score来实现。
d、RabbitMQ TTL + DLX:使用RabbitMQ的过期时间和死信队列实现。
7、持久化
a、客户端不要设置自动确认消息(autoAck),而是由服务端确认。
b、发送端确认机制、镜像队列。
8、事务
a、开启事务:channel.txSelect()
b、提交事务:channel.txCommit()
c、回滚事务:channel.txRollback()
9、发送方确认机制
因为事务太过重量了,严重影响的RabbitMQ的吞吐量,所以RabbitMQ提供了一种更为轻量的方式,来保证生产者发送的消息真真的到达了RabbitMQ。
a、首先需要将信道设置为发送方确认模式
channel.confirmSelect();
b、然后通过waitForConfirms()或waitForConfirmsOrDie()确认消息已发送成功
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
public class PublisherConfirmProduct { private static final String EXCHANGE_NAME = "demo.exchange"; private static final String ROUTING_KEY = "demo.routingkey"; private static final String QUEUE_NAME = "demo.queue"; private static final String MESSAGE = "Hello World!"; /** * 单条确认 */ public static void commonConfirm() throws Exception { Connection connection = RabbitMqUtils.getConnection(); Channel channel = initChannel(connection); channel.confirmSelect(); for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); if (channel.waitForConfirms()) { // 逐条确认是否发送成功 System.out.println("send success!"); } } RabbitMqUtils.close(connection, channel); } /** * 批量确认 */ public static void batchConfirm() throws Exception { Connection connection = RabbitMqUtils.getConnection(); Channel channel = initChannel(connection); channel.confirmSelect(); for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); } // 批量确认是否发送成功,如果某一次确认失败这一批都要重新发送 if (channel.waitForConfirms()) { System.out.println("send success!"); } RabbitMqUtils.close(connection, channel); } /** * 异步确认 */ public static void asyncConfirm() throws Exception { Connection connection = RabbitMqUtils.getConnection(); Channel channel = initChannel(connection); channel.basicQos(1); channel.confirmSelect(); // 定义一个未确认消息集合 final SortedSet<Long> unConfirmSet = Collections.synchronizedNavigableSet(new TreeSet<>()); for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); unConfirmSet.add(channel.getNextPublishSeqNo()); } channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println(format("拒绝消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple)); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println(format("确认消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple)); if (multiple) { // multiple为true,则deliveryTag之前的所有消息全部被确认 unConfirmSet.headSet(deliveryTag + 1).clear(); } else { // 否则只确认一条消息 unConfirmSet.remove(deliveryTag); } } }); TimeUnit.SECONDS.sleep(5); System.out.println(unConfirmSet.size()); RabbitMqUtils.close(connection, channel); } private static Channel initChannel(Connection connection) throws IOException { Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); return channel; } public static void main(String[] args) throws Exception { // commonConfirm(); // batchConfirm(); asyncConfirm(); } }
10、RabbitMQ管理
a、vhost:
Vhost(virtual host),其本质是一个独立的小型RabbitMQ服务器,拥有自己独立的队列、交换器以及绑定关系等,并且它拥有自己独立的权限。
客户端连接RabbitMQ时必须制定一个vhost,默认使用"/"。
也就是说一个RabbitMQ服务可以拥有很多个子RabbitMQ服务,这些子RabbitMQ服务可以为其它很多应用程序提供服务;这样可以减少RabbitMQ服务器搭建的成本。
b、角色与权限:
权限控制是以vhost为单位,创建用户时至少要指定一个vhost;对于用户来说可以跨vhost授权,不同的vhost可以有不同的权限。
角色:
- none:无任何角色,新创建的用户默认角色为none。
- management:可以访问web管理页面。
- policymaker:包含management的所有权限,并可以管理策略(Policy) 和参数(Parameter)。
- monitoring:包含management的所有权限,并可以看到所有连接、信道及节点相关信息。
- administartor:包含minitoring的所有权限,并可以管理用户、虚拟主机、权限、策略、参数等。
11、RabbitMQ集群
a、为什么要集群:人多力量大,通过线性扩展来提高RabbitMQ的吞吐量。
b、集群后就能保证RabbitMQ的消息不丢失嘛:当然不能保证,默认情况下RabbitMQ的消息是不会在集群中复制的,节点宕机后消息就会丢失。
c、队列和交换器再集群中是以什么样的形式存在:队列不会复制,其它节点只存储了此队列所在节点的元数据;而交换器本质就是一个hashMap的映射,节点间会进行复制。
d、关于集群的建议:集群中至少要有一个磁盘节点(也就是持久化的RabbitMQ节点),虽然磁盘节点挂掉了依然可以发送和接受消息,但却不能执行创建队列、交换器、绑定关系等等操作。高可用的话建议至少两个磁盘节点,如果不确认如何选择磁盘节点与内存节点时建议全部选择磁盘节点,但这样的话会在一定程度上影响RabbitMQ的吞吐量。
12、镜像队列
RabbitMQ集群并不能保证队列的复制,而镜像队列就是用来解决这一问题的。它可以将对队列镜像到其它集群节点中,若集群中一个节点失效了它会自动切换到另一个镜像的节点上,来保证服务的可用性。
13、关于RabbitMQ性能优化的建议
影响RabbitMQ性能的因素有很多,主要的分为硬件性能与软件性能。
)硬件性能:如网络、内存、CPU等等。
)软件性能:消息持久化、消息确认、路由算法与绑定规则等等。
1、消息持久化:持久化会写入磁盘,多一次IO操作,设置非持久化可提升性能(durable=false)。
2、消息确认:消费者订阅队列时,设置自动确认也可以提升性能(autoAck=true)。
3、路由算法与绑定规则:fanout,只要绑定了交换器就可以匹配到,匹配规则少,性能肯定是绑定规则中最佳的;direct,除了交换器还需要匹配路由键,性能次之;topic,最复杂的匹配规则,相对其它两个性能最差。(比较fanout > direct > topic)