什么是JMS?
- JMS
-
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信;Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(JMS接口实现如RocketMQ,Kafka,ActiveMQ);
- JMS是Sun公司早期提出的消息标准,旨在为Java应用提供统一的消息操作,它是一种与厂商无关的API,用来访问消息收发系统消息;JMS是针对java的,而微软开发了NMS(.NET消息传递服务);
-
特性
- 面向Java平台的标准消息传递API;
- 在Java或JVM语言比如Scala、Groovy中具有互用性;
- 无需担心底层协议;
- 有queues和topics两种消息传递模型;
- 支持事务、能够定义消息格式(消息头、属性和内容);
- 常见概念
- JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等;
- JMS生产者(Message Producer):生产消息的服务;
- JMS消费者(Message Consumer):消费消息的服务;
- JMS消息:数据对象;
- JMS队列:存储待消费消息的区域;
- JMS主题:一种支持发送消息给多个订阅者的机制;
- JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe);
- 基础编程模型
什么是AMQP
- 高级消息队列协议即Advanced Message Queuing Protocol(AMQP)是面向消息中间件提供的开放的应用层协议,其设计目标是对于消息的排序、路由(包括点对点和订阅-发布)、保持可靠性、保证安全性;AMQP规范了消息传递方和接收方的行为,以使消息在不同的提供商之间实现互操作性,就像SMTP,HTTP,FTP等协议可以创建交互系统一样;与先前的中间件标准(如Java消息服务)不同的是,JMS在特定的API接口层面和实现行为上进行了统一,而高级消息队列协议则关注于各种消息如何以字节流的形式进行传递;因此,使用了符合协议实现的任意应用程序之间可以保持对消息的创建、传递;
-
AMQP和JMS的主要区别
- AMQP不从API层进行限定,直接定义网络交换的数据格式,这使得实现了AMQP的provider天然性就是跨平台;
- 比如Java语言产生的消息,可以用其他语言(比如python的)进行消费;
- AQMP可以用http来进行类比,不关心实现接口的语言,只要都按照相应的数据格式去发送报文请求,不同语言的client可以和不同语言的server进行通讯;
- JMS消息类型:TextMessage/ObjectMessage/StreamMessage等;
- AMQP消息类型:Byte[];
RabbitMQ概念
- RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错,与Spring AMQP完美的整合、API丰富易用;
- 文档:https://www.rabbitmq.com/getstarted.html
- 核心概念
-
Broker
- RabbitMQ的服务端程序,可以认为一个mq节点就是一个broker;
-
Producer 生产者
- 创建消息Message,然后发布到RabbitMQ中;
-
Consumer 消费者
- 消费队列里面的消息;
-
Message 消息
- 生产消费的内容,有消息头和消息体,也包括多个属性配置,比如RoutingKey路由键;
-
Queue 队列
- Queue是RabbitMQ 的内部对象,用于存储消息,消息都只能存储在队列中;
-
Channel 信道
- 一条支持多路复用的通道,独立的双向数据流通道,可以发布、订阅、接收消息;
- 信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道;
-
Connection 连接
- Connection是RabbitMQ的socket连接,它封装了socket协议相关部分逻辑,一个连接上可以有多个channel进行通信;
-
Exchange 交换机
- 生产者将消息发送到 Exchange,交换机将消息路由到一个或者多个队列中,里面有多个类型,队列和交换机是多对多的关系;
-
RoutingKey 路由键
- 生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则;
- 最大长度255 字节;
-
Binding 绑定
- 通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 ( BindingKey ),这样RabbitMQ就可以将消息路由到对应的队列;(生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和 RoutingKey相匹配时,消息会被路由到对应的队列中)
- Virtual host 虚拟主机
-
用于不同业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个VirtualHost 里面不能有相同名称的Exchange或Queue;默认是/;
-
-
-
-
- 添加虚拟主机
-
-
- 关系图
-
- 主要端口
4369 erlang 发现口 5672 client 端通信口 15672 管理界面 ui 端口 25672 server 间内部通信口
Web管控台
默认账号密码:guest/guest;
每个虚拟主机默认就有7个交换机
简单队列
参考:[https://www.rabbitmq.com/tutorials/tutorial-one-java.html]
一个消息生成者对应一个消息消费者,点对点
测试如下:
消息生成者
public class SimpleQueueSender { private final static String QUEUE_NAME = "simple_queue"; private final static Logger LOG = LoggerFactory.getLogger(SimpleQueueSender.class); public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); try ( // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { /** * 队列名称 * 持久化配置:mq重启后还在 * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占 * 自动删除: 当没有消费者的时候,自动删除掉,一般是false * 其他参数 * * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; /** * 参数说明: 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由, 路由健名称 配置信息 发送的消息数据:字节数组 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); LOG.info(" [x] Sent '" + message + "'"); } } }
消息消费者
public class SimpleQueueReceiver { private final static String QUEUE_NAME = "simple_queue"; private final static Logger LOG = LoggerFactory.getLogger(SimpleQueueReceiver.class); public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); LOG.info(" [*] Waiting for messages. To exit press CTRL+C"); //回调方法,下面两种都行 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1 LOG.info("consumerTag消息标识="+consumerTag); //可以获取交换机,路由健等 LOG.info("envelope元数据="+envelope); LOG.info("properties配置信息="+properties); LOG.info("body="+new String(body, StandardCharsets.UTF_8)); } }; channel.basicConsume(QUEUE_NAME,true, consumer); // DeliverCallback deliverCallback = (consumerTag, envelop, delivery,properties, msg) -> { // String message = new String(msg, "UTF-8"); // System.out.println(" [x] Received '" + message + "'"); // }; //自动确认消息 //channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
消息生成者
消息消费者
工作队列
参考:[https://www.rabbitmq.com/tutorials/tutorial-two-java.html]
-
消息生产能力大于消费能力,增加多几个消费节点;
-
和简单队列类似,增加多个几个消费节点,处于竞争关系;
-
策略有轮询和非公平,默认为轮询;
- 轮询模式代码测试:
消息生成者
public class WorkQueueSender { private final static String QUEUE_NAME = "work_mq"; private final static Logger LOG = LoggerFactory.getLogger(WorkQueueSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); try ( // 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { /** * 队列名称 持久化配置 排他配置 自动删除 其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 轮训发送 10个 for (int i = 0; i < 10; i++) { String message = "Hello World!" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); LOG.info(" [x] Sent '" + message + "'"); } } } }
消息消费者1
public class WorkQueueReceiver1 { private final static String QUEUE_NAME = "work_mq"; private final static Logger LOG = LoggerFactory.getLogger(WorkQueueReceiver1.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); LOG.info(" [*]Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费缓慢 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOG.info("[x] Received '" + message + "'"); //手工确认消息消费,不是多条确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
消息消费者2
public class WorkQueueReceiver2 { private final static String QUEUE_NAME = "work_mq"; private final static Logger LOG = LoggerFactory.getLogger(WorkQueueReceiver1.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); LOG.info(" [*]Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费缓慢 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOG.info("[x] Received '" + message + "'"); //手工确认消息消费,不是多条确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
先启动2个消费者,再启动生成者;存在部分节点消费过快,部分节点消费慢,导致不能合理处理消息;
消息生成者
消息消费者1
消息消费者2
- 公平策略模式代码测试
消费者1修改如下:
public class WorkQueueReceiver1 { private final static String QUEUE_NAME = "work_mq"; private final static Logger LOG = LoggerFactory.getLogger(WorkQueueReceiver1.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); LOG.info(" [*]Waiting for messages. To exit press CTRL+C"); //限制消费者每次只能消费一条消息,处理完成后才能处理下一条 int fetchCount = 1; channel.basicQos(fetchCount); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费缓慢 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOG.info("[x] Received '" + message + "'"); //手工确认消息消费,不是多条确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
消费者2修改如下:
public class WorkQueueReceiver2 { private final static String QUEUE_NAME = "work_mq"; private final static Logger LOG = LoggerFactory.getLogger(WorkQueueReceiver1.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); LOG.info(" [*]Waiting for messages. To exit press CTRL+C"); //限制消费者每次只能消费一条消息,处理完成后才能处理下一条 int fetchCount = 1; channel.basicQos(fetchCount); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费缓慢 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOG.info("[x] Received '" + message + "'"); //手工确认消息消费,不是多条确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
消费者1
消费者2
发布订阅模式
参考:[https://www.rabbitmq.com/tutorials/tutorial-three-java.html]
- RabbitMQ的Exchange 交换机
- 生产者将消息发送到 Exchange,交换器将消息路由到一个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系;
- 交换机只负责转发消息,不具备存储消息的能力,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失;
- 交换机类型
-
Direct Exchange 定向
- 将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配;eg:如果一个队列绑定到该交换机上要求路由键 “aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb;
-
Fanout Exchange 广播
- 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;很像子网广播,每台子网内的主机都获得了一份复制的消息;
- Fanout交换机转发消息是最快的,用于发布订阅,广播形式;
-
Topic Exchange 通配符
- topic交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点;通过过交换机和队列绑定到指定的【通配符路由键】,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要指定routingkey路由键;
- 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词;eg:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”;
-
Headers Exchanges(少用)
- 根据发送的消息内容中的headers属性进行匹配, 在绑定Queue与Exchange时指定一组键值对;
- 当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;
- 如果完全匹配则消息会路由到该队列,否则不会路由到该队列;
-
- 发布-订阅模型中,消息生产者不再是直接面对queue(队列名称),而是直面Exchange(交换机),都需要经过exchange来进行消息的发送, 所有发往同一个fanout交换机(广播)的消息都会被所有监听这个交换机的消费者接收到;
- 通过把消息发送给交换机,交互机转发给对应绑定的队列;
- 交换机绑定的队列是排它(独占队列),第一次创建它的Connection有效,当Connection关闭时,该队列自动删除;
消息生产者测试代码:
public class PubSender { private final static String EXCHANGE_NAME = "exchange_fanout"; private final static Logger LOG = LoggerFactory.getLogger(PubSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 绑定交换机,fanout扇形,即广播类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "Hello World pub !"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); LOG.info(" [x] Sent '" + message + "'"); } } }
消息消费者测试代码:
public class SubReceiver1 { private final static String EXCHANGE_NAME = "exchange_fanout"; private final static Logger LOG = LoggerFactory.getLogger(PubSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); // 消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,fanout扇形,即广播类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 获取队列(排它队列) String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机,fanout交换机不用指定routingkey channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); LOG.info(" [x] Received '" + message + "'"); }; // 自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
启动2个消费节点,1生产节点;
消息生成者
消息消费者1
消息消费者2
路由模式
参考:[https://www.rabbitmq.com/tutorials/tutorial-four-java.html]
- 交换机类型是Direct,队列和交换机绑定,需要指定一个路由key( 也叫Bingding Key);消息生产者发送消息到交换机,需要指定routingKey,交换机根据消息的路由key,转发给对应的队列;
消息生产测试代码
public class RoutingSender { private final static String EXCHANGE_NAME = "exchange_direct"; private final static Logger LOG = LoggerFactory.getLogger(RoutingSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String error = "error log"; String info = "info log"; String debug = "debug log"; channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); } } }
消息消费者测试代码
public class RoutingReceiver { private final static String EXCHANGE_NAME = "exchange_direct"; private final static Logger LOG = LoggerFactory.getLogger(RoutingSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); // 消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,fanout扇形,即广播类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取队列 String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey channel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey"); channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey"); channel.queueBind(queueName, EXCHANGE_NAME, "debugRoutingKey"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOG.info(" [x] Received '" + message + "'"); }; // 自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消息生产者
消息消费者
Topic主题通配符模式
参考:[https://www.rabbitmq.com/tutorials/tutorial-five-java.html]
当业务有很多的路由key,可以使用Topic主题通配符模式匹配;
-
交换机是 topic, 可以实现发布订阅模式fanout和路由模式Direct 的功能,更加灵活,支持模式匹配,通配符等;
- 交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词,一般用#作为通配符居多;
quick.orange.rabbit 只会匹配 *.orange.* 和 *.*.rabbit ,进到Q1和Q2 lazy.orange.elephant 只会匹配 *.orange.* 和 lazy.#,进到Q1和Q2 quick.orange.fox 只会匹配 *.orange.*,进入Q1 lazy.brown.fox 只会匹配azy.#,进入Q2 lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次)
- 交换机和队列绑定时用的binding key使用通配符的路由键;
- 生产者发送消息时需要使用具体的路由键;
消息生产者测试代码
public class TopicSender { private final static String EXCHANGE_NAME = "exchange_topic"; private final static Logger LOG = LoggerFactory.getLogger(TopicSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String error = "error log"; String info = "info log"; String debug = "debug log"; channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); } } }
消息消费者1测试代码
public class TopicReceiver1 { private final static String EXCHANGE_NAME = "exchange_topic"; private final static Logger LOG = LoggerFactory.getLogger(TopicReceiver1.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列和交换机,第一个节点 channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOG.info(" [x] Received '" + message + "'"); }; //自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消息消费者2测试代码
public class TopicReceiver2 { private final static String EXCHANGE_NAME = "exchange_topic"; private final static Logger LOG = LoggerFactory.getLogger(TopicReceiver2.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列和交换机,第二个节点 channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOG.info(" [x] Received '" + message + "'"); }; //自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消息生成者
消息消费者1
消息消费者2
如何保证消息可靠性
发送方
需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理;
需要使用RabbitMQ消息返回机制,若没有发现目标队列,中间件会通知发送方;
- 单条同步确认机制
配置channel,开启确认模式:channel.confirmSelect();每发送一条消息,调用channel.waitForConfirms()方法,等待确认;
测试代码如下:
public class TopicSender { private final static String EXCHANGE_NAME = "exchange_topic"; private final static Logger LOG = LoggerFactory.getLogger(TopicSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 发送端确认 channel.confirmSelect(); // 绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String info = "info log"; channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); // 等待确认 if (channel.waitForConfirms()) { LOG.info("RabbitMQ confirm success"); } else { LOG.info("RabbitMQ confirm fail"); } } } }
- 多条同步确认机制
配置channel,开启确认模式:channel.confirmSelect();发送多条消息,调用channel.waitForConfirms()方法,等待确认;
public class TopicSender2 { private final static String EXCHANGE_NAME = "exchange_topic"; private final static Logger LOG = LoggerFactory.getLogger(TopicSender2.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 发送端确认 channel.confirmSelect(); // 绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String info = "info log"; for (int i = 0; i < 10; i++) { channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); } // 等待确认 if (channel.waitForConfirms()) { LOG.info("RabbitMQ confirm success"); } else { LOG.info("RabbitMQ confirm fail"); } } } }
- 异步确认
配置channel,开启确认模式:channel.confirmSelect();
在channel上添加监听:addConfirmListener,发送消息后,会回调此方法,通知是否发送成功;
异步确认可能是单条,也可能是多条,取决于MQ;
public class TopicSender3 { private final static String EXCHANGE_NAME = "exchange_topic"; private final static Logger LOG = LoggerFactory.getLogger(TopicSender3.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 发送端确认 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // deliveryTag为发送端的消息序号,multiple为多条还是单条 LOG.info("Ack, deliveryTag: {}, multiple: {}", deliveryTag, multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { LOG.info("Nack, deliveryTag:{}, multiple:{}", deliveryTag, multiple); } }); // 绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String info = "info log"; for (int i = 0; i < 10; i++) { channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); } Thread.sleep(5000L); } } }
deliveryTag为发送端的序号,multiple为多条还是单条;
从上面这张图可以看出,消息发送和异步确认不是同一个线程;
Ack, deliveryTag: 1, multiple: false,意思是RabbitMQ确认发送的第一条消息,multiple: false消息发送确认1条;
Ack, deliveryTag: 2, multiple: false,意思是RabbitMQ确认发送的第二条消息,multiple: false消息发送确认1条;
Ack, deliveryTag: 6, multiple: true,意思是RabbitMQ确认发送第六条消息,multiple: true意思是六条消息前面的所有消息都确认;
Ack, deliveryTag: 7, multiple: false,意思是RabbitMQ确认发送的第七条消息,multiple: false消息发送确认1条;
Ack, deliveryTag: 10, multiple: true,意思是RabbitMQ确认发送第十条消息,multiple: true意思是十条消息前面的所有消息都确认;
注:deliveryTag与Channel有关,当发送端部署多节点的时候,这会有多个Channel,deliveryTag可能不唯一;
- 消息返回机制
消息发送后,中间件会对消息进行路由;若没有发现目标队列,中间件会通知发送方;Return Listener会被调用;
消息返回开启配置项:Mandatory;若Mandatory为false,RabbitMQ将直接丢弃无法路由的消息;
com.rabbitmq.client.Channel#basicPublish(java.lang.String, java.lang.String, boolean, com.rabbitmq.client.AMQP.BasicProperties, byte[])
其中第三个参数为Mandatory;
消息发送方往交换机发送一个不存在的rountingKey;
消息发送方测试代码如下:
public class RoutingSender { private final static String EXCHANGE_NAME = "exchange_direct"; private final static Logger LOG = LoggerFactory.getLogger(RoutingSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { LOG.info( "Message Return: replyCode:{}, replyText:{}, exchange:{},routingKey:{}, properties:{},body:{}", replyCode, replyText, exchange, routingKey, properties, body); } }); // channel.addReturnListener(new ReturnCallback() { // @Override // public void handle(Return returnMessage) { // LOG.info("Message Return: returnMessage:{}", returnMessage); // } // }); String info = "info log"; channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", true, null, info.getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); } } }
结果如下:
INFO org.example.manager.rabbitmq.confirm.RoutingSender - Message Return: replyCode:312, replyText:NO_ROUTE, exchange:exchange_direct,routingKey:infoRoutingKey, properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null),body:[105, 110, 102, 111, 32, 108, 111, 103]
ReturnListener也可以按下面这样的写法:
channel.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) { LOG.info("Message Return: returnMessage:{}", returnMessage); } });
消费方
默认情况下,消费端接收消息时,消息会被自动确认(ACK);需要使用RabbitMQ消费端确认机制,确认消息没有发生处理异常;
消费端ACK类型
自动ACK:消费端收到消息后,会自动签收消息;
手动ACK:消费端收到消息后,不会自动签收消息,需要在业务代码中显式签收消息;
手动ACK类型
单条手动ACK:multiple=false
单条手动ACK:multiple=true
重回队列
若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理;一般不开启重回队列,因为第一次处理异常的消息,再次处理,基本也是异常的;
消费端手动ACK确认测试代码如下:
public class RoutingReceiver2 { private final static String EXCHANGE_NAME = "exchange_direct"; private final static Logger LOG = LoggerFactory.getLogger(RoutingSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); // 消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,fanout扇形,即广播类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取队列 String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey"); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); LOG.info(" [x] Received '" + msg + "'"); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; // 取消自动确认消息 channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { }); } }
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); 第二个参数为multiple,true为处理多条ACK,false为处理单条ACK;
当消息生产者发送消息时,消息消费者会进入消息回调处理;deliveryTag表示消息投递序号,每次消费消息或者消息重新投递后, deliveryTag都会增加1;
重回队列
在调用channel#basicNack方法,将requeue参数置为true;
如下:
DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); LOG.info(" [x] Received '" + msg + "'"); // channel.basicAck(message.getEnvelope().getDeliveryTag(), false); channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true); } };
消费方会重复处理同一条消息,当Nack应答时,会将消息重新放回队列,之后监听的是同一个消费方,最终导致死循环;
消费端限流
RabbitMQ的QoS(服务质量保证)功能,QoS保证了在一定数目的消息未被确认前,不消费新的消息,注:不使用自动确认;
消费端限流参数配置:
prefetchCount:针对一个消费端最多推送多少未确认消息;
global:true,针对整个消费端限流;false,针对整个Channel;
prefetchSize:0(单个消息大小限制,一般为0);
后两项是AMQP的,而RabbitMQ暂未实现;
先不设置QoS,模拟消费延时,设置手动ACK,消费端测试代码:
public class RoutingReceiver3 { private final static String EXCHANGE_NAME = "exchange_direct"; private final static Logger LOG = LoggerFactory.getLogger(RoutingSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); // 消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定交换机,fanout扇形,即广播类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取队列 String queueName = channel.queueDeclare().getQueue(); // 绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey"); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), StandardCharsets.UTF_8); LOG.info(" [x] Received '" + msg + "'"); // 模拟耗时 try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true); } }; // 取消自动确认消息 channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { }); } }
消息生产者发送50条消息;从管控台看Reday为0 ,Unacked为42,Total为42;没有设置QoS之,所有的消息都推给了这个消费者,即使在部署一个消费节点,也不能改善消息堆积的问题,之前的消息都已经推到第一个消费者的内存里,此时没有Ready的消息,新的消费者不能消费堆积的消息,只有第一个消费者把所有Unacked的消息处理完,消息堆积才消失;
设置QoS
//限制消费者每次只能消费一条消息,处理完成后才能处理下一条 int fetchCount = 2; channel.basicQos(fetchCount);
消息生成者同样发送50条消息,此时只有2条消息是处于Unacked状态的;
RabbitMQ自身
大量堆积的消息会给RabbitMQ产生很大的压力,需要使用RabbitMQ消息过期时间,防止消息大量堆积;
RabbitMQ的过期时间为TTL,生存时间;RabbitMQ的过期时间分为消息TTL和队列TTL;消息TTL设置了单条消息的过期时间,队列TTL设置了队列中所有的消息的过期时间;
TTL应该明显长于服务的重启时间;
设置参数:
x-message-ttl,队列所有消息的过期时间;
x-expire,这个队列的过期时间,过期之后会将队列删除,一般不用;
如果之前配置了持久化的队列,现在在程序中重新设置队列参数,这个程序运行会报错,需要在管控台将队列先删掉,之后再重新运行程序;
死信队列
过期后会直接被丢弃,无法对系统运行发出警报,需要使用RabbitMQ死信队列,收集过期消息,以供分析;
死信队列用于处理无法被正常消费的消息,即死信消息;死信队列被配置了DLX属性(Dead-Letter-Exchange);
当一个消息变成死信后,消息会被重新发布到另一个Exchange交换机上,这个Exchange也是一个普通交换机, 这个交换机就是DLX死信交换机;死信被死信交换机路由后,一般进入一个固定队列;
死信转移过程如下:
死信队列设置方法
- 设置转发,接收死信的交换机和队列:
- Exchange: dlx.exchange,dlx.exchange为参数名
- Queue: dlx.queue,dlx.queue为参数名
- RoutingKey: #,#为需要的路由键名
- 在需要设置死信队列的交换机加入参数:
- x-dead-letter-exchange = dlx.exchange,dlx.exchange为参数名
消息变成死信的条件如下:
- 消费者拒收消息(basic.reject/ basic.nack),并且没有重新入队 requeue=false;
消息生产者测试代码如下:
ublic class QueueSender { private final static String EXCHANGE_NAME = "exchange.topic"; private final static Logger LOG = LoggerFactory.getLogger(QueueSender.class); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { channel.basicPublish(EXCHANGE_NAME, "dlx", true, null, "dlx test".getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); } } }
消息消费者测试代码如下:
public class QueueReceiver { private final static Logger LOG = LoggerFactory.getLogger(QueueReceiver.class); private final static String EXCHANGE_NAME = "exchange.topic"; private final static String QUEUE_NAME = "queue.topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.211.135"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); /** * 消息生产者不用过多操作,只需要和交换机绑定即可 */ try (// 创建连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel()) { // 声明接收死信的交换机 channel.exchangeDeclare("exchange.dlx", BuiltinExchangeType.TOPIC, true, false, null); // 声明接收死信的队列 channel.queueDeclare("queue.dlx", true, false, false, null); // 声明接收死信的绑定 channel.queueBind("queue.dlx", "exchange.dlx", "#"); // 声明队列时,携带以下参数: HashMap<String, Object> args = new HashMap<>(16); args.put("x-dead-letter-exchange", "exchange.dlx"); // 绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, false, null); // 绑定队列 channel.queueDeclare(QUEUE_NAME, true, false, false, args); // 绑定路由键 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#"); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); LOG.info(" [x] Received '" + msg + "'"); // channel.basicAck(message.getEnvelope().getDeliveryTag(), false); channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false); }; // 自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); while (true) { synchronized (QueueReceiver.class) { QueueReceiver.class.wait(); } } } } }
测试前,需要在管控台清理之前的队列;
程序完成后,可以从管控台看出,配置了死信队列属性(x-dead-letter-exchange)的队列才是死信队列,即测试代码中的queue.topic队列,而queue.dlx为接收死信的队列,当有消息变成死信后,消息会被重新发送到这个交换机;
接收死信的队列的信息如下:
- 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live);
-
-
队列过期时间使用参数,对整个队列消息统一过期
- x-message-ttl
- 单位ms(毫秒)
-
消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,已经还在队列里面)
- expiration
- 单位ms(毫秒)
-
两者都配置的话,时间短的先触发
-
消息消费者的queue.topic队列添加属性:
args.put("x-message-ttl", 15000);
当消息生产者投递消息后,一开始消息是投递到queue.topic队列;
此时消息消费者一直不处理(以关闭应用模拟),15秒后,消息会重新投递到queue.dlx队列;
可以看到进入接收死信的队列是因为消息过期;
- 队列超过最大长度的消息会转发到死信交换机,死信交换机再将消息路由到一个普通的队列;
消息消费者的queue.topic队列添加属性:
args.put("x-message-ttl", 150000); args.put("x-max-length", 3);
消息生产者修改:
for (int i = 0; i < 5; i++) { channel.basicPublish(EXCHANGE_NAME, "dlx", true, null, "dlx test".getBytes(StandardCharsets.UTF_8)); LOG.info("消息发送成功"); }
消息发送完毕后,有2条消息被重新投递到了queue.dlx队列,效果如下:
第一条消息
第二条消息
消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列;